PCI DSS DevSecOps: Automating Payment Security Compliance in Cloud Environments

The PCI DSS Compliance Challenge in Cloud-Native Payment Systems
Your organization processes millions of payment transactions daily across cloud-native microservices, serverless functions, and third-party payment gateways. Each component must maintain PCI DSS compliance while enabling rapid development cycles and continuous deployment. Manual compliance validation creates deployment bottlenecks, increases security risks, and can result in costly compliance failures that threaten your ability to process payments.
PCI DSS DevSecOps automation transforms payment security from a deployment barrier into an integrated security capability, providing continuous compliance validation, automated security controls, and real-time risk monitoring throughout your development and deployment pipeline.
PCI DSS in Cloud-Native Architecture
PCI DSS (Payment Card Industry Data Security Standard) requires comprehensive security controls across all systems that store, process, or transmit cardholder data. In cloud-native environments, this means implementing security at every layer while maintaining the agility and scalability that modern applications demand.
PCI DSS Requirements for Cloud Environments
Core Compliance Requirements:
Requirement | Description | Cloud Implementation | Automation Approach |
---|---|---|---|
Req 1-2 | Network Security | VPC isolation, WAF, NACLs | Infrastructure as Code validation |
Req 3 | Cardholder Data Protection | Encryption at rest/transit | Automated encryption verification |
Req 4 | Secure Transmission | TLS 1.2+, certificate management | Certificate lifecycle automation |
Req 6 | Secure Development | SAST, DAST, dependency scanning | CI/CD security gates |
Req 7 | Access Control | RBAC, least privilege | Automated access review |
Req 8 | Authentication | MFA, strong passwords | Identity provider integration |
Req 10 | Logging & Monitoring | Centralized logging, SIEM | Real-time log analysis |
Req 11 | Security Testing | Vulnerability scanning, penetration testing | Automated security scanning |
Cloud-Native PCI DSS Architecture
1. Secure Payment Processing Microservice
# payment-security/secure_payment_service.py
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field, validator
from typing import Optional, Dict, List
from datetime import datetime, timedelta
import uuid
import hashlib
import cryptography.fernet
import asyncio
import logging
from enum import Enum
import re
app = FastAPI(
title="Secure Payment Service",
description="PCI DSS Compliant Payment Processing API",
version="2.0.0"
)
# PCI DSS Requirement 6: Secure Development
class PaymentMethod(str, Enum):
CREDIT_CARD = "credit_card"
DEBIT_CARD = "debit_card"
ACH = "ach"
DIGITAL_WALLET = "digital_wallet"
class TransactionStatus(str, Enum):
PENDING = "pending"
AUTHORIZED = "authorized"
CAPTURED = "captured"
DECLINED = "declined"
FAILED = "failed"
REFUNDED = "refunded"
class SecurityLevel(str, Enum):
L1 = "level_1" # > 6M transactions/year
L2 = "level_2" # 1-6M transactions/year
L3 = "level_3" # 20K-1M e-commerce transactions/year
L4 = "level_4" # < 20K e-commerce transactions/year
# PCI DSS Requirement 3: Cardholder Data Protection
class TokenizedPaymentRequest(BaseModel):
payment_token: str = Field(..., min_length=20, max_length=50)
amount: float = Field(..., gt=0, le=999999.99)
currency: str = Field(..., regex=r'^[A-Z]{3}$')
merchant_id: str = Field(..., min_length=8, max_length=20)
order_id: str = Field(..., min_length=6, max_length=50)
payment_method: PaymentMethod
customer_id: Optional[str] = Field(None, min_length=6, max_length=50)
billing_address: Optional[Dict] = None
metadata: Optional[Dict] = None
@validator('payment_token')
def validate_payment_token(cls, v):
# Ensure token doesn't contain actual card data
if re.search(r'\d{13,19}', v):
raise ValueError('Payment token cannot contain card numbers')
return v
class PaymentResponse(BaseModel):
transaction_id: str
status: TransactionStatus
payment_method: PaymentMethod
amount: float
currency: str
authorized_amount: Optional[float] = None
authorization_code: Optional[str] = None
reference_number: str
timestamp: datetime
merchant_id: str
compliance_metadata: Dict
class PCILogger:
"""PCI DSS Requirement 10: Logging and Monitoring"""
def __init__(self):
self.logger = logging.getLogger("pci_audit")
self.logger.setLevel(logging.INFO)
# Configure secure logging handler
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s | %(name)s | %(levelname)s | %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_payment_event(self, event_type: str, transaction_id: str,
user_id: str, details: Dict):
"""Log payment events for PCI DSS compliance"""
# PCI DSS 10.2: Log all payment card data access
log_entry = {
'event_type': event_type,
'transaction_id': transaction_id,
'user_id': user_id,
'timestamp': datetime.now().isoformat(),
'source_ip': details.get('source_ip', 'unknown'),
'user_agent': details.get('user_agent', 'unknown'),
'success': details.get('success', False),
'amount': details.get('amount'),
'currency': details.get('currency'),
'merchant_id': details.get('merchant_id'),
'compliance_level': details.get('compliance_level', 'unknown')
}
# Ensure no sensitive data in logs (PCI DSS 3.4)
if 'card_number' in str(details):
self.logger.error(f"SECURITY VIOLATION: Card data in log attempt - {transaction_id}")
raise SecurityException("Card data cannot be logged")
self.logger.info(f"PAYMENT_EVENT: {log_entry}")
def log_access_attempt(self, endpoint: str, user_id: str, success: bool,
source_ip: str, details: Dict = None):
"""Log access attempts for security monitoring"""
access_log = {
'event_type': 'access_attempt',
'endpoint': endpoint,
'user_id': user_id,
'success': success,
'source_ip': source_ip,
'timestamp': datetime.now().isoformat(),
'details': details or {}
}
self.logger.info(f"ACCESS_LOG: {access_log}")
class PCIDataProtection:
"""PCI DSS Requirement 3: Protect stored cardholder data"""
def __init__(self, encryption_key: str):
self.fernet = cryptography.fernet.Fernet(encryption_key.encode())
self.token_prefix = "tok_"
def tokenize_sensitive_data(self, sensitive_data: str) -> str:
"""Tokenize sensitive payment data"""
# Generate secure token
token_data = {
'original_hash': hashlib.sha256(sensitive_data.encode()).hexdigest(),
'timestamp': datetime.now().isoformat(),
'token_id': str(uuid.uuid4())
}
# Encrypt and store token mapping
encrypted_mapping = self.fernet.encrypt(sensitive_data.encode())
token = f"{self.token_prefix}{token_data['token_id']}"
# In production, store encrypted_mapping in secure token vault
# with proper key management (HSM or cloud KMS)
return token
def detokenize_data(self, token: str) -> str:
"""Detokenize data for processing (restricted access)"""
if not token.startswith(self.token_prefix):
raise ValueError("Invalid token format")
# In production, retrieve from secure token vault
# This operation should be heavily logged and monitored
# Token vault access should require additional authentication
# Simulate token vault lookup
return "detokenized_data_placeholder"
def encrypt_for_storage(self, data: str) -> str:
"""Encrypt data for secure storage"""
return self.fernet.encrypt(data.encode()).decode()
def decrypt_from_storage(self, encrypted_data: str) -> str:
"""Decrypt data from storage"""
return self.fernet.decrypt(encrypted_data.encode()).decode()
def validate_no_cardholder_data(self, data: Dict) -> bool:
"""Validate that data doesn't contain prohibited cardholder data"""
data_str = str(data).lower()
# Check for potential card numbers (basic regex)
card_patterns = [
r'\b4\d{15}\b', # Visa
r'\b5[1-5]\d{14}\b', # Mastercard
r'\b3[47]\d{13}\b', # American Express
r'\b6(?:011|5\d{2})\d{12}\b' # Discover
]
for pattern in card_patterns:
if re.search(pattern, data_str):
return False
# Check for CVV patterns
if re.search(r'\b\d{3,4}\b', data_str) and ('cvv' in data_str or 'cvc' in data_str):
return False
return True
class PCIAccessControl:
"""PCI DSS Requirement 7-8: Access Control and Authentication"""
def __init__(self):
self.authorized_roles = {
'payment_processor': ['process_payment', 'view_transaction'],
'payment_admin': ['process_payment', 'view_transaction', 'refund_payment'],
'security_admin': ['view_logs', 'manage_access'],
'compliance_auditor': ['view_logs', 'view_transaction', 'export_audit']
}
self.security_policies = {
'max_failed_attempts': 3,
'account_lockout_duration': 30, # minutes
'password_min_length': 12,
'mfa_required': True,
'session_timeout': 15 # minutes
}
def validate_user_permissions(self, user_id: str, required_permission: str) -> bool:
"""Validate user has required permissions"""
# In production, integrate with identity provider (OAuth2, SAML)
user_roles = self.get_user_roles(user_id)
for role in user_roles:
if required_permission in self.authorized_roles.get(role, []):
return True
return False
def get_user_roles(self, user_id: str) -> List[str]:
"""Get user roles from identity provider"""
# Simulate role lookup
return ['payment_processor']
def enforce_least_privilege(self, user_id: str, requested_action: str) -> bool:
"""Enforce least privilege access principle"""
# Check if user has minimum required permissions
return self.validate_user_permissions(user_id, requested_action)
def log_privileged_access(self, user_id: str, action: str, resource: str):
"""Log privileged access for compliance"""
access_log = {
'user_id': user_id,
'action': action,
'resource': resource,
'timestamp': datetime.now().isoformat(),
'privilege_level': 'high'
}
# Send to security monitoring system
logging.getLogger("privileged_access").info(access_log)
class PaymentSecurityService:
"""Main payment processing service with PCI DSS compliance"""
def __init__(self):
self.logger = PCILogger()
self.data_protection = PCIDataProtection("encryption_key_from_hsm")
self.access_control = PCIAccessControl()
# PCI DSS Requirement 11: Regular security testing
self.security_scan_last_run = datetime.now() - timedelta(days=7)
self.compliance_level = SecurityLevel.L1
async def process_payment(self, payment_request: TokenizedPaymentRequest,
user_id: str, source_ip: str, user_agent: str) -> PaymentResponse:
"""Process payment with full PCI DSS compliance"""
transaction_id = str(uuid.uuid4())
try:
# PCI DSS Requirement 7: Access Control
if not self.access_control.validate_user_permissions(user_id, 'process_payment'):
raise HTTPException(status_code=403, detail="Insufficient permissions")
# PCI DSS Requirement 3: Data Protection Validation
if not self.data_protection.validate_no_cardholder_data(payment_request.dict()):
self.logger.log_payment_event(
"security_violation", transaction_id, user_id,
{"error": "cardholder_data_detected", "source_ip": source_ip}
)
raise HTTPException(status_code=400, detail="Invalid data format")
# PCI DSS Requirement 10: Audit Logging
self.logger.log_payment_event(
"payment_initiated", transaction_id, user_id,
{
"amount": payment_request.amount,
"currency": payment_request.currency,
"merchant_id": payment_request.merchant_id,
"source_ip": source_ip,
"user_agent": user_agent,
"success": True
}
)
# Process payment through secure gateway
payment_result = await self._process_payment_gateway(payment_request, transaction_id)
# Create compliant response
response = PaymentResponse(
transaction_id=transaction_id,
status=payment_result['status'],
payment_method=payment_request.payment_method,
amount=payment_request.amount,
currency=payment_request.currency,
authorized_amount=payment_result.get('authorized_amount'),
authorization_code=payment_result.get('auth_code'),
reference_number=payment_result['reference'],
timestamp=datetime.now(),
merchant_id=payment_request.merchant_id,
compliance_metadata={
'pci_dss_version': '4.0',
'compliance_level': self.compliance_level.value,
'security_scan_status': 'current',
'encryption_standard': 'AES-256-GCM'
}
)
# Log successful transaction
self.logger.log_payment_event(
"payment_completed", transaction_id, user_id,
{
"status": payment_result['status'],
"amount": payment_request.amount,
"currency": payment_request.currency,
"merchant_id": payment_request.merchant_id,
"source_ip": source_ip,
"success": True
}
)
return response
except Exception as e:
# Log failed transaction
self.logger.log_payment_event(
"payment_failed", transaction_id, user_id,
{
"error": str(e),
"amount": payment_request.amount,
"currency": payment_request.currency,
"merchant_id": payment_request.merchant_id,
"source_ip": source_ip,
"success": False
}
)
raise
async def _process_payment_gateway(self, payment_request: TokenizedPaymentRequest,
transaction_id: str) -> Dict:
"""Process payment through secure gateway"""
# Simulate secure payment gateway processing
# In production, integrate with PCI DSS compliant payment processor
await asyncio.sleep(0.5) # Simulate processing time
return {
'status': TransactionStatus.AUTHORIZED,
'authorized_amount': payment_request.amount,
'auth_code': f"AUTH{str(uuid.uuid4())[:8].upper()}",
'reference': f"REF{str(uuid.uuid4())[:12].upper()}"
}
async def validate_compliance_status(self) -> Dict:
"""Validate current PCI DSS compliance status"""
compliance_checks = {
'network_security': await self._check_network_security(),
'data_encryption': await self._check_data_encryption(),
'access_controls': await self._check_access_controls(),
'monitoring': await self._check_monitoring_status(),
'security_testing': await self._check_security_testing(),
'vulnerability_management': await self._check_vulnerability_status()
}
# Calculate overall compliance score
passed_checks = sum(1 for check in compliance_checks.values() if check['status'] == 'compliant')
total_checks = len(compliance_checks)
compliance_score = (passed_checks / total_checks) * 100
return {
'compliance_score': compliance_score,
'compliance_level': self.compliance_level.value,
'last_assessment': datetime.now().isoformat(),
'detailed_checks': compliance_checks,
'next_assessment_due': (datetime.now() + timedelta(days=90)).isoformat(),
'certification_status': 'active' if compliance_score >= 95 else 'requires_attention'
}
async def _check_network_security(self) -> Dict:
"""Check PCI DSS Requirement 1-2: Network Security"""
return {
'requirement': 'Network Security (Req 1-2)',
'status': 'compliant',
'details': {
'firewall_configured': True,
'default_passwords_changed': True,
'network_segmentation': True,
'wireless_encryption': True
}
}
async def _check_data_encryption(self) -> Dict:
"""Check PCI DSS Requirement 3-4: Data Protection"""
return {
'requirement': 'Data Protection (Req 3-4)',
'status': 'compliant',
'details': {
'cardholder_data_encrypted': True,
'encryption_keys_protected': True,
'transmission_encrypted': True,
'key_rotation_current': True
}
}
async def _check_access_controls(self) -> Dict:
"""Check PCI DSS Requirement 7-8: Access Control"""
return {
'requirement': 'Access Control (Req 7-8)',
'status': 'compliant',
'details': {
'role_based_access': True,
'unique_user_ids': True,
'multi_factor_auth': True,
'password_policy_enforced': True
}
}
async def _check_monitoring_status(self) -> Dict:
"""Check PCI DSS Requirement 10: Monitoring"""
return {
'requirement': 'Monitoring (Req 10)',
'status': 'compliant',
'details': {
'audit_logging_enabled': True,
'log_monitoring_active': True,
'time_synchronization': True,
'log_integrity_protected': True
}
}
async def _check_security_testing(self) -> Dict:
"""Check PCI DSS Requirement 11: Security Testing"""
days_since_scan = (datetime.now() - self.security_scan_last_run).days
return {
'requirement': 'Security Testing (Req 11)',
'status': 'compliant' if days_since_scan <= 90 else 'non_compliant',
'details': {
'vulnerability_scan_current': days_since_scan <= 90,
'penetration_test_current': True,
'network_intrusion_detection': True,
'file_integrity_monitoring': True,
'days_since_last_scan': days_since_scan
}
}
async def _check_vulnerability_status(self) -> Dict:
"""Check PCI DSS Requirement 6: Vulnerability Management"""
return {
'requirement': 'Vulnerability Management (Req 6)',
'status': 'compliant',
'details': {
'security_patches_current': True,
'secure_development_practices': True,
'application_security_testing': True,
'vulnerability_scanning_regular': True
}
}
# Initialize services
payment_service = PaymentSecurityService()
security = HTTPBearer()
# PCI DSS compliant API endpoints
@app.post("/payment/process", response_model=PaymentResponse)
async def process_payment(
payment_request: TokenizedPaymentRequest,
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(security)
):
"""Process payment with full PCI DSS compliance"""
# Extract request metadata
source_ip = request.client.host
user_agent = request.headers.get("user-agent", "unknown")
user_id = "authenticated_user" # Extract from JWT token in production
return await payment_service.process_payment(
payment_request, user_id, source_ip, user_agent
)
@app.get("/compliance/status")
async def get_compliance_status(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Get current PCI DSS compliance status"""
return await payment_service.validate_compliance_status()
@app.get("/health/security")
async def security_health_check():
"""Security-focused health check for payment system"""
return {
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'security_features': {
'encryption_enabled': True,
'authentication_required': True,
'audit_logging_active': True,
'monitoring_enabled': True
},
'compliance': {
'pci_dss_version': '4.0',
'last_validation': datetime.now().isoformat()
}
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, ssl_keyfile="key.pem", ssl_certfile="cert.pem")
2. Automated PCI DSS Compliance Validation
# pci-compliance/automated_compliance_validator.py
import boto3
import json
import subprocess
import requests
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
import yaml
import logging
@dataclass
class ComplianceViolation:
requirement_id: str
severity: str
description: str
resource: str
remediation: str
detected_at: datetime
class PCIDSSComplianceValidator:
"""Automated PCI DSS compliance validation for cloud infrastructure"""
def __init__(self, aws_profile: str = None):
self.session = boto3.Session(profile_name=aws_profile)
self.ec2 = self.session.client('ec2')
self.elbv2 = self.session.client('elbv2')
self.rds = self.session.client('rds')
self.cloudtrail = self.session.client('cloudtrail')
self.config = self.session.client('config')
self.inspector = self.session.client('inspector2')
self.logger = logging.getLogger(__name__)
self.violations = []
# PCI DSS compliance rules
self.compliance_rules = {
'req_1_2': self._validate_network_security,
'req_3': self._validate_data_protection,
'req_4': self._validate_secure_transmission,
'req_6': self._validate_secure_development,
'req_7_8': self._validate_access_controls,
'req_10': self._validate_logging_monitoring,
'req_11': self._validate_security_testing
}
async def run_comprehensive_compliance_scan(self) -> Dict:
"""Run comprehensive PCI DSS compliance scan"""
scan_results = {
'scan_id': f"pci_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
'scan_timestamp': datetime.now().isoformat(),
'scope': 'full_infrastructure',
'compliance_requirements': {},
'violations': [],
'overall_score': 0,
'certification_status': 'unknown'
}
# Run all compliance checks
for req_id, validator_func in self.compliance_rules.items():
self.logger.info(f"Running compliance check: {req_id}")
try:
requirement_result = await validator_func()
scan_results['compliance_requirements'][req_id] = requirement_result
# Collect violations
if not requirement_result['compliant']:
scan_results['violations'].extend(requirement_result['violations'])
except Exception as e:
self.logger.error(f"Failed to validate {req_id}: {str(e)}")
scan_results['compliance_requirements'][req_id] = {
'compliant': False,
'error': str(e),
'violations': []
}
# Calculate overall compliance score
compliant_requirements = sum(
1 for req in scan_results['compliance_requirements'].values()
if req.get('compliant', False)
)
total_requirements = len(scan_results['compliance_requirements'])
scan_results['overall_score'] = (compliant_requirements / total_requirements) * 100
# Determine certification status
if scan_results['overall_score'] >= 95:
scan_results['certification_status'] = 'compliant'
elif scan_results['overall_score'] >= 85:
scan_results['certification_status'] = 'mostly_compliant'
else:
scan_results['certification_status'] = 'non_compliant'
return scan_results
async def _validate_network_security(self) -> Dict:
"""Validate PCI DSS Requirements 1-2: Network Security"""
violations = []
checks = {
'vpc_isolation': False,
'security_groups_configured': False,
'nacl_configured': False,
'waf_enabled': False,
'default_passwords_changed': False
}
# Check VPC configuration
vpcs = self.ec2.describe_vpcs()['Vpcs']
if vpcs:
checks['vpc_isolation'] = True
# Check for dedicated VPCs for payment processing
payment_vpcs = [
vpc for vpc in vpcs
if 'payment' in vpc.get('Tags', [{}])[0].get('Value', '').lower()
]
if not payment_vpcs:
violations.append(ComplianceViolation(
requirement_id='1.1.4',
severity='high',
description='No dedicated VPC found for payment processing',
resource='vpc_configuration',
remediation='Create dedicated VPC for payment card data environment',
detected_at=datetime.now()
))
# Check Security Groups
security_groups = self.ec2.describe_security_groups()['SecurityGroups']
restrictive_sgs = []
for sg in security_groups:
# Check for overly permissive rules
for rule in sg.get('IpPermissions', []):
for ip_range in rule.get('IpRanges', []):
if ip_range.get('CidrIp') == '0.0.0.0/0':
violations.append(ComplianceViolation(
requirement_id='1.3.1',
severity='critical',
description=f'Security group {sg["GroupId"]} allows unrestricted access',
resource=sg['GroupId'],
remediation='Restrict security group rules to minimum required access',
detected_at=datetime.now()
))
else:
restrictive_sgs.append(sg['GroupId'])
checks['security_groups_configured'] = len(restrictive_sgs) > 0
# Check WAF configuration
try:
load_balancers = self.elbv2.describe_load_balancers()['LoadBalancers']
waf_protected_albs = []
for alb in load_balancers:
# Check for WAF association
try:
waf_info = self.elbv2.describe_load_balancer_attributes(
LoadBalancerArn=alb['LoadBalancerArn']
)
# In real implementation, check actual WAF association
waf_protected_albs.append(alb['LoadBalancerArn'])
except:
violations.append(ComplianceViolation(
requirement_id='1.3.4',
severity='medium',
description=f'Load balancer {alb["LoadBalancerName"]} not protected by WAF',
resource=alb['LoadBalancerArn'],
remediation='Associate WAF with load balancer',
detected_at=datetime.now()
))
checks['waf_enabled'] = len(waf_protected_albs) > 0
except Exception as e:
self.logger.warning(f"Could not validate WAF configuration: {str(e)}")
# Determine overall compliance for network security
compliant = len(violations) == 0 and all(checks.values())
return {
'requirement': 'Network Security (Req 1-2)',
'compliant': compliant,
'checks': checks,
'violations': [vars(v) for v in violations],
'score': sum(checks.values()) / len(checks) * 100
}
async def _validate_data_protection(self) -> Dict:
"""Validate PCI DSS Requirement 3: Data Protection"""
violations = []
checks = {
'rds_encryption_enabled': False,
'ebs_encryption_enabled': False,
's3_encryption_enabled': False,
'kms_key_rotation': False,
'no_cardholder_data_storage': False
}
# Check RDS encryption
try:
rds_instances = self.rds.describe_db_instances()['DBInstances']
encrypted_instances = [
db for db in rds_instances if db.get('StorageEncrypted', False)
]
checks['rds_encryption_enabled'] = len(encrypted_instances) == len(rds_instances)
for db in rds_instances:
if not db.get('StorageEncrypted', False):
violations.append(ComplianceViolation(
requirement_id='3.4',
severity='critical',
description=f'RDS instance {db["DBInstanceIdentifier"]} not encrypted',
resource=db['DBInstanceIdentifier'],
remediation='Enable encryption at rest for RDS instance',
detected_at=datetime.now()
))
except Exception as e:
self.logger.warning(f"Could not validate RDS encryption: {str(e)}")
# Check EBS encryption
try:
volumes = self.ec2.describe_volumes()['Volumes']
encrypted_volumes = [vol for vol in volumes if vol.get('Encrypted', False)]
checks['ebs_encryption_enabled'] = len(encrypted_volumes) == len(volumes)
for vol in volumes:
if not vol.get('Encrypted', False):
violations.append(ComplianceViolation(
requirement_id='3.4',
severity='high',
description=f'EBS volume {vol["VolumeId"]} not encrypted',
resource=vol['VolumeId'],
remediation='Enable encryption for EBS volume',
detected_at=datetime.now()
))
except Exception as e:
self.logger.warning(f"Could not validate EBS encryption: {str(e)}")
# Check KMS key rotation
try:
kms = self.session.client('kms')
keys = kms.list_keys()['Keys']
rotation_enabled_count = 0
for key in keys[:10]: # Check first 10 keys
try:
rotation_status = kms.get_key_rotation_status(KeyId=key['KeyId'])
if rotation_status.get('KeyRotationEnabled', False):
rotation_enabled_count += 1
except:
pass # Key might not support rotation
checks['kms_key_rotation'] = rotation_enabled_count > 0
except Exception as e:
self.logger.warning(f"Could not validate KMS key rotation: {str(e)}")
# Simulate cardholder data scanning
checks['no_cardholder_data_storage'] = await self._scan_for_cardholder_data()
compliant = len(violations) == 0 and all(checks.values())
return {
'requirement': 'Data Protection (Req 3)',
'compliant': compliant,
'checks': checks,
'violations': [vars(v) for v in violations],
'score': sum(checks.values()) / len(checks) * 100
}
async def _validate_secure_transmission(self) -> Dict:
"""Validate PCI DSS Requirement 4: Secure Transmission"""
violations = []
checks = {
'tls_1_2_minimum': False,
'certificate_management': False,
'strong_cryptography': False,
'wireless_encryption': False
}
# Check load balancer SSL/TLS configuration
try:
load_balancers = self.elbv2.describe_load_balancers()['LoadBalancers']
for alb in load_balancers:
listeners = self.elbv2.describe_listeners(
LoadBalancerArn=alb['LoadBalancerArn']
)['Listeners']
ssl_listeners = [l for l in listeners if l['Protocol'] in ['HTTPS', 'TLS']]
for listener in ssl_listeners:
# Check SSL policy
ssl_policy = listener.get('SslPolicy', '')
if 'TLSv1.2' not in ssl_policy:
violations.append(ComplianceViolation(
requirement_id='4.1',
severity='high',
description=f'Load balancer uses weak SSL policy: {ssl_policy}',
resource=alb['LoadBalancerArn'],
remediation='Update SSL policy to require TLS 1.2 minimum',
detected_at=datetime.now()
))
else:
checks['tls_1_2_minimum'] = True
# Check for HTTP listeners (should redirect to HTTPS)
http_listeners = [l for l in listeners if l['Protocol'] == 'HTTP']
for listener in http_listeners:
# Check if HTTP redirects to HTTPS
default_actions = listener.get('DefaultActions', [])
redirect_actions = [
a for a in default_actions
if a.get('Type') == 'redirect' and
a.get('RedirectConfig', {}).get('Protocol') == 'HTTPS'
]
if not redirect_actions:
violations.append(ComplianceViolation(
requirement_id='4.1',
severity='medium',
description=f'HTTP listener does not redirect to HTTPS',
resource=listener['ListenerArn'],
remediation='Configure HTTP to HTTPS redirect',
detected_at=datetime.now()
))
except Exception as e:
self.logger.warning(f"Could not validate load balancer SSL configuration: {str(e)}")
# Check certificate management
try:
acm = self.session.client('acm')
certificates = acm.list_certificates()['CertificateSummaryList']
valid_certificates = 0
for cert in certificates:
cert_details = acm.describe_certificate(CertificateArn=cert['CertificateArn'])
certificate = cert_details['Certificate']
# Check certificate status and expiration
if certificate['Status'] == 'ISSUED':
not_after = certificate.get('NotAfter')
if not_after and not_after > datetime.now(not_after.tzinfo):
valid_certificates += 1
else:
violations.append(ComplianceViolation(
requirement_id='4.1',
severity='high',
description=f'Certificate {cert["CertificateArn"]} expired or expiring soon',
resource=cert['CertificateArn'],
remediation='Renew SSL certificate',
detected_at=datetime.now()
))
checks['certificate_management'] = valid_certificates > 0
except Exception as e:
self.logger.warning(f"Could not validate certificate management: {str(e)}")
compliant = len(violations) == 0 and any(checks.values())
return {
'requirement': 'Secure Transmission (Req 4)',
'compliant': compliant,
'checks': checks,
'violations': [vars(v) for v in violations],
'score': sum(checks.values()) / len(checks) * 100
}
async def _validate_logging_monitoring(self) -> Dict:
"""Validate PCI DSS Requirement 10: Logging and Monitoring"""
violations = []
checks = {
'cloudtrail_enabled': False,
'vpc_flow_logs': False,
'application_logging': False,
'log_integrity': False,
'centralized_logging': False
}
# Check CloudTrail configuration
try:
trails = self.cloudtrail.describe_trails()['trailList']
active_trails = []
for trail in trails:
trail_status = self.cloudtrail.get_trail_status(Name=trail['TrailARN'])
if trail_status.get('IsLogging', False):
active_trails.append(trail)
# Check if trail logs data events
event_selectors = self.cloudtrail.get_event_selectors(
TrailName=trail['TrailARN']
)
has_data_events = any(
selector.get('ReadWriteType') == 'All'
for selector in event_selectors.get('EventSelectors', [])
)
if not has_data_events:
violations.append(ComplianceViolation(
requirement_id='10.2',
severity='medium',
description=f'CloudTrail {trail["Name"]} not logging data events',
resource=trail['TrailARN'],
remediation='Configure CloudTrail to log data events',
detected_at=datetime.now()
))
else:
violations.append(ComplianceViolation(
requirement_id='10.1',
severity='high',
description=f'CloudTrail {trail["Name"]} not actively logging',
resource=trail['TrailARN'],
remediation='Enable CloudTrail logging',
detected_at=datetime.now()
))
checks['cloudtrail_enabled'] = len(active_trails) > 0
except Exception as e:
self.logger.warning(f"Could not validate CloudTrail: {str(e)}")
# Check VPC Flow Logs
try:
vpcs = self.ec2.describe_vpcs()['Vpcs']
for vpc in vpcs:
flow_logs = self.ec2.describe_flow_logs(
Filters=[
{'Name': 'resource-id', 'Values': [vpc['VpcId']]}
]
)['FlowLogs']
active_flow_logs = [
fl for fl in flow_logs
if fl.get('FlowLogStatus') == 'ACTIVE'
]
if not active_flow_logs:
violations.append(ComplianceViolation(
requirement_id='10.2',
severity='medium',
description=f'VPC {vpc["VpcId"]} missing flow logs',
resource=vpc['VpcId'],
remediation='Enable VPC Flow Logs',
detected_at=datetime.now()
))
else:
checks['vpc_flow_logs'] = True
except Exception as e:
self.logger.warning(f"Could not validate VPC Flow Logs: {str(e)}")
compliant = len(violations) == 0 and sum(checks.values()) >= 3
return {
'requirement': 'Logging and Monitoring (Req 10)',
'compliant': compliant,
'checks': checks,
'violations': [vars(v) for v in violations],
'score': sum(checks.values()) / len(checks) * 100
}
async def _scan_for_cardholder_data(self) -> bool:
"""Scan for prohibited cardholder data storage"""
# Simulate cardholder data scanning
# In production, this would use DLP tools like Amazon Macie
self.logger.info("Scanning for cardholder data...")
# Simulate scan results
return True # No cardholder data found
def generate_compliance_report(self, scan_results: Dict) -> str:
"""Generate comprehensive PCI DSS compliance report"""
report = f"""
# PCI DSS Compliance Report
**Scan ID:** {scan_results['scan_id']}
**Scan Date:** {scan_results['scan_timestamp']}
**Overall Score:** {scan_results['overall_score']:.1f}%
**Certification Status:** {scan_results['certification_status'].upper()}
## Executive Summary
This report provides a comprehensive assessment of PCI DSS compliance across all in-scope systems and applications.
### Compliance Score Breakdown
"""
for req_id, result in scan_results['compliance_requirements'].items():
status_icon = "✅" if result.get('compliant', False) else "❌"
report += f"- **{result.get('requirement', req_id)}**: {status_icon} {result.get('score', 0):.1f}%\n"
report += f"""
## Violations Summary
**Total Violations:** {len(scan_results['violations'])}
### Critical Violations
"""
critical_violations = [v for v in scan_results['violations'] if v.get('severity') == 'critical']
for violation in critical_violations:
report += f"""
#### {violation['requirement_id']}: {violation['description']}
- **Resource:** {violation['resource']}
- **Remediation:** {violation['remediation']}
- **Detected:** {violation['detected_at']}
"""
report += f"""
### High Priority Violations
"""
high_violations = [v for v in scan_results['violations'] if v.get('severity') == 'high']
for violation in high_violations:
report += f"""
#### {violation['requirement_id']}: {violation['description']}
- **Resource:** {violation['resource']}
- **Remediation:** {violation['remediation']}
"""
report += f"""
## Recommendations
1. **Immediate Actions Required:**
- Address all critical violations within 24 hours
- Implement emergency security controls for high-risk findings
2. **Short-term Actions (1-30 days):**
- Remediate all high-priority violations
- Enhance monitoring and logging capabilities
- Conduct additional security testing
3. **Long-term Actions (30-90 days):**
- Implement automated compliance monitoring
- Enhance security awareness training
- Plan for quarterly compliance assessments
## Next Steps
- **Next Assessment:** {(datetime.now() + timedelta(days=90)).strftime('%Y-%m-%d')}
- **Quarterly Review:** Required for Level 1 merchants
- **Continuous Monitoring:** Implement real-time compliance validation
## Appendix
### Compliance Framework Details
- **PCI DSS Version:** 4.0
- **Assessment Type:** Self-Assessment Questionnaire (SAQ) / On-site Assessment
- **Scope:** Full infrastructure assessment
- **Compliance Level:** Merchant Level 1 (>6M transactions/year)
"""
return report
if __name__ == "__main__":
import asyncio
async def main():
validator = PCIDSSComplianceValidator()
# Run comprehensive compliance scan
results = await validator.run_comprehensive_compliance_scan()
# Generate and save report
report = validator.generate_compliance_report(results)
with open('pci_dss_compliance_report.md', 'w') as f:
f.write(report)
with open('pci_dss_compliance_results.json', 'w') as f:
json.dump(results, f, indent=2, default=str)
print(f"Compliance scan completed!")
print(f"Overall score: {results['overall_score']:.1f}%")
print(f"Status: {results['certification_status']}")
print(f"Violations: {len(results['violations'])}")
asyncio.run(main())
Continuous Compliance in CI/CD
PCI DSS Security Gates
1. GitHub Actions PCI DSS Validation Workflow
# .github/workflows/pci-dss-compliance.yml
name: PCI DSS Compliance Validation
on:
push:
branches: [main, develop]
paths:
- 'src/**'
- 'infrastructure/**'
- 'payment-services/**'
pull_request:
branches: [main]
paths:
- 'src/**'
- 'infrastructure/**'
- 'payment-services/**'
schedule:
- cron: '0 2 * * *' # Daily compliance check
workflow_dispatch:
env:
PCI_COMPLIANCE_LEVEL: 'L1' # Level 1 Merchant
SECURITY_SCAN_REQUIRED: true
jobs:
pci-compliance-gates:
name: PCI DSS Security Gates
runs-on: ubuntu-latest
permissions:
contents: read
security-events: write
id-token: write
steps:
- name: Checkout Code
uses: actions/checkout@v4
with:
fetch-depth: 0 # Full history for better scanning
# PCI DSS Requirement 6: Secure Development
- name: Static Application Security Testing (SAST)
uses: github/super-linter@v4
env:
DEFAULT_BRANCH: main
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
VALIDATE_PYTHON: true
VALIDATE_JAVASCRIPT: true
VALIDATE_TERRAFORM: true
VALIDATE_YAML: true
- name: Dependency Vulnerability Scanning
run: |
# Install security scanning tools
pip install safety bandit semgrep
npm install -g audit-ci retire
# Python dependency scanning
if [ -f requirements.txt ]; then
echo "Scanning Python dependencies..."
safety check -r requirements.txt --json --output safety-report.json || true
# Fail on critical vulnerabilities
CRITICAL_VULNS=$(jq '.vulnerabilities | map(select(.severity == "critical")) | length' safety-report.json)
if [ "$CRITICAL_VULNS" -gt 0 ]; then
echo "❌ Critical vulnerabilities found in Python dependencies"
exit 1
fi
fi
# JavaScript dependency scanning
if [ -f package.json ]; then
echo "Scanning JavaScript dependencies..."
npm audit --audit-level moderate
retire --path . --outputformat json --outputpath retire-report.json || true
fi
- name: Secret Detection Scanning
run: |
# Install TruffleHog for secret detection
curl -sSfL https://raw.githubusercontent.com/trufflesecurity/trufflehog/main/scripts/install.sh | sh -s -- -b /usr/local/bin
# Scan for secrets in repository
trufflehog git file://. --json --output trufflehog-results.json
# Check for any secrets found
SECRET_COUNT=$(jq '. | length' trufflehog-results.json)
if [ "$SECRET_COUNT" -gt 0 ]; then
echo "❌ Secrets detected in repository"
jq '.[].DetectorName' trufflehog-results.json
exit 1
fi
- name: Payment Data Validation
run: |
# Custom script to validate no cardholder data in code
python3 << 'EOF'
import re
import os
import sys
def scan_for_cardholder_data(directory):
violations = []
# Patterns for potential cardholder data
patterns = {
'credit_card': r'\b4\d{15}\b|\b5[1-5]\d{14}\b|\b3[47]\d{13}\b|\b6(?:011|5\d{2})\d{12}\b',
'cvv': r'\b\d{3,4}\b.*(?:cvv|cvc|security.?code)',
'expiry': r'\b(0[1-9]|1[0-2])\/\d{2,4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b'
}
for root, dirs, files in os.walk(directory):
# Skip common non-source directories
dirs[:] = [d for d in dirs if d not in ['.git', 'node_modules', '__pycache__', '.venv']]
for file in files:
if file.endswith(('.py', '.js', '.ts', '.java', '.go', '.rb', '.php')):
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
for pattern_name, pattern in patterns.items():
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
violations.append({
'file': file_path,
'pattern': pattern_name,
'matches': len(matches)
})
except Exception as e:
print(f"Warning: Could not scan {file_path}: {e}")
return violations
# Scan source code
violations = scan_for_cardholder_data('.')
if violations:
print("❌ Potential cardholder data found in source code:")
for violation in violations:
print(f" {violation['file']}: {violation['pattern']} ({violation['matches']} matches)")
sys.exit(1)
else:
print("✅ No cardholder data patterns detected")
EOF
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE }}
aws-region: us-east-1
# PCI DSS Requirement 11: Security Testing
- name: Infrastructure Security Scanning
run: |
# Install Terraform security scanners
curl -L "$(curl -s https://api.github.com/repos/aquasecurity/tfsec/releases/latest | grep -o -E "https://.+?tfsec-linux-amd64")" > tfsec
chmod +x tfsec
curl -L "$(curl -s https://api.github.com/repos/Checkmarx/kics/releases/latest | grep -o -E "https://.+?kics_.*_linux_x64.tar.gz")" > kics.tar.gz
tar -xzf kics.tar.gz
# Scan Terraform configurations
if [ -d "infrastructure" ]; then
echo "Scanning Terraform configurations..."
./tfsec infrastructure/ --format json --out tfsec-results.json
./kics scan -p infrastructure/ -o json -f kics-results.json --report-formats json
# Check for high severity issues
HIGH_SEVERITY=$(jq '[.results[] | select(.severity == "HIGH" or .severity == "CRITICAL")] | length' tfsec-results.json)
if [ "$HIGH_SEVERITY" -gt 0 ]; then
echo "❌ High severity security issues found in infrastructure"
jq '.results[] | select(.severity == "HIGH" or .severity == "CRITICAL") | .description' tfsec-results.json
exit 1
fi
fi
- name: Application Security Testing
run: |
# Install Bandit for Python security scanning
bandit -r . -f json -o bandit-results.json || true
# Install Semgrep for multi-language security scanning
python -m semgrep --config=auto --json --output=semgrep-results.json . || true
# Check for high confidence security issues
if [ -f bandit-results.json ]; then
HIGH_CONFIDENCE=$(jq '[.results[] | select(.issue_confidence == "HIGH" and .issue_severity == "HIGH")] | length' bandit-results.json)
if [ "$HIGH_CONFIDENCE" -gt 0 ]; then
echo "❌ High confidence security issues found"
jq '.results[] | select(.issue_confidence == "HIGH" and .issue_severity == "HIGH") | .test_name' bandit-results.json
exit 1
fi
fi
- name: Container Security Scanning
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
run: |
# Install Trivy for container scanning
curl -sfL https://raw.githubusercontent.com/aquasecurity/trivy/main/contrib/install.sh | sh -s -- -b /usr/local/bin
# Build and scan Docker images
if [ -f Dockerfile ]; then
echo "Building and scanning Docker image..."
docker build -t payment-service:latest .
# Scan for vulnerabilities
trivy image --exit-code 1 --severity HIGH,CRITICAL --format json --output trivy-results.json payment-service:latest
fi
- name: PCI DSS Infrastructure Validation
run: |
# Run automated PCI DSS compliance check
python3 << 'EOF'
import boto3
import json
def validate_pci_compliance():
# Initialize AWS clients
ec2 = boto3.client('ec2')
elbv2 = boto3.client('elbv2')
compliance_issues = []
try:
# Check Security Groups for overly permissive rules
sgs = ec2.describe_security_groups()['SecurityGroups']
for sg in sgs:
for rule in sg.get('IpPermissions', []):
for ip_range in rule.get('IpRanges', []):
if ip_range.get('CidrIp') == '0.0.0.0/0':
compliance_issues.append({
'type': 'security_group',
'resource': sg['GroupId'],
'issue': 'Unrestricted access',
'severity': 'critical'
})
# Check Load Balancers for HTTPS
albs = elbv2.describe_load_balancers()['LoadBalancers']
for alb in albs:
listeners = elbv2.describe_listeners(LoadBalancerArn=alb['LoadBalancerArn'])['Listeners']
https_listeners = [l for l in listeners if l['Protocol'] == 'HTTPS']
http_listeners = [l for l in listeners if l['Protocol'] == 'HTTP']
if http_listeners and not https_listeners:
compliance_issues.append({
'type': 'load_balancer',
'resource': alb['LoadBalancerName'],
'issue': 'HTTP without HTTPS redirect',
'severity': 'high'
})
except Exception as e:
print(f"Warning: Could not complete infrastructure validation: {e}")
return compliance_issues
# Run compliance validation
issues = validate_pci_compliance()
# Save results
with open('pci-infrastructure-issues.json', 'w') as f:
json.dump(issues, f, indent=2)
# Check for critical issues
critical_issues = [i for i in issues if i['severity'] == 'critical']
if critical_issues:
print("❌ Critical PCI DSS compliance issues found:")
for issue in critical_issues:
print(f" {issue['type']}: {issue['resource']} - {issue['issue']}")
exit(1)
else:
print("✅ No critical PCI DSS compliance issues detected")
EOF
- name: Upload Security Artifacts
uses: actions/upload-artifact@v3
if: always()
with:
name: security-scan-results
path: |
*-results.json
*-report.json
retention-days: 90 # Keep for compliance audit trail
- name: Security Gate Summary
if: always()
run: |
echo "## 🔒 PCI DSS Security Gates Summary" >> $GITHUB_STEP_SUMMARY
echo "| Check | Status | Details |" >> $GITHUB_STEP_SUMMARY
echo "|-------|--------|---------|" >> $GITHUB_STEP_SUMMARY
echo "| Static Analysis | ✅ Passed | No critical issues found |" >> $GITHUB_STEP_SUMMARY
echo "| Dependency Scan | ✅ Passed | No critical vulnerabilities |" >> $GITHUB_STEP_SUMMARY
echo "| Secret Detection | ✅ Passed | No secrets detected |" >> $GITHUB_STEP_SUMMARY
echo "| Payment Data Validation | ✅ Passed | No cardholder data in code |" >> $GITHUB_STEP_SUMMARY
echo "| Infrastructure Security | ✅ Passed | Terraform configurations secure |" >> $GITHUB_STEP_SUMMARY
echo "| PCI DSS Compliance | ✅ Passed | No critical compliance issues |" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "**Compliance Level:** $PCI_COMPLIANCE_LEVEL" >> $GITHUB_STEP_SUMMARY
echo "**Scan Date:** $(date -u)" >> $GITHUB_STEP_SUMMARY
deploy-to-production:
name: Deploy to PCI DSS Environment
needs: pci-compliance-gates
if: github.ref == 'refs/heads/main' && github.event_name == 'push'
runs-on: ubuntu-latest
environment: production
steps:
- name: Deploy with PCI DSS Controls
run: |
echo "🚀 Deploying to PCI DSS compliant environment"
echo "✅ All security gates passed"
echo "✅ Ready for production deployment"
# Production deployment with additional PCI DSS controls
# - Encrypted communication
# - Audit logging enabled
# - Network segmentation enforced
# - Monitoring activated
Business Impact and ROI
PCI DSS Automation ROI Analysis
Manual vs. Automated PCI DSS Compliance:
Process | Manual Approach | Automated Approach | Time Savings | Cost Savings |
---|---|---|---|---|
Security Testing | 40 hours/quarter | 2 hours/quarter | 95% reduction | $114K annually |
Vulnerability Management | 60 hours/month | 8 hours/month | 87% reduction | $156K annually |
Compliance Reporting | 80 hours/quarter | 8 hours/quarter | 90% reduction | $216K annually |
Code Security Reviews | 160 hours/month | 20 hours/month | 88% reduction | $420K annually |
Incident Response | 24-72 hours | 2-8 hours | 83% reduction | $300K per incident |
Audit Preparation | 320 hours annually | 40 hours annually | 88% reduction | $280K annually |
Risk Mitigation Value:
# Annual PCI DSS automation value
SECURITY_TESTING_SAVINGS = 114000 # Automated security scanning
VULNERABILITY_MGMT_SAVINGS = 156000 # Continuous vulnerability assessment
COMPLIANCE_REPORTING_SAVINGS = 216000 # Automated compliance validation
CODE_REVIEW_SAVINGS = 420000 # Automated security code review
INCIDENT_RESPONSE_IMPROVEMENT = 300000 # Faster incident detection/response
AUDIT_PREPARATION_SAVINGS = 280000 # Automated evidence collection
FINE_AVOIDANCE_VALUE = 5000000 # Avoiding PCI DSS non-compliance fines
TOTAL_ANNUAL_VALUE = (SECURITY_TESTING_SAVINGS + VULNERABILITY_MGMT_SAVINGS +
COMPLIANCE_REPORTING_SAVINGS + CODE_REVIEW_SAVINGS +
INCIDENT_RESPONSE_IMPROVEMENT + AUDIT_PREPARATION_SAVINGS +
FINE_AVOIDANCE_VALUE)
# Total Value: $6,486,000 annually
IMPLEMENTATION_COST = 500000 # PCI DSS automation implementation
ANNUAL_MAINTENANCE = 100000 # Ongoing maintenance and compliance
FIRST_YEAR_ROI = ((TOTAL_ANNUAL_VALUE - IMPLEMENTATION_COST - ANNUAL_MAINTENANCE) /
(IMPLEMENTATION_COST + ANNUAL_MAINTENANCE)) * 100
# ROI: 981% in first year
ONGOING_ROI = ((TOTAL_ANNUAL_VALUE - ANNUAL_MAINTENANCE) / ANNUAL_MAINTENANCE) * 100
# Ongoing ROI: 6,386% annually
Conclusion
PCI DSS DevSecOps automation transforms payment security from a compliance burden into a competitive advantage. By implementing automated security controls, continuous compliance validation, and integrated security testing throughout the development lifecycle, organizations can process payments securely while maintaining development velocity.
The key to successful PCI DSS automation lies in building security into your development process from the beginning, treating compliance as code, and implementing continuous validation rather than periodic assessments.
Remember that PCI DSS compliance is not just about avoiding fines - it’s about protecting customer trust, enabling secure commerce, and building resilient payment systems that can adapt to evolving threats.
Your PCI DSS automation journey starts with implementing secure development practices and automated security testing in your CI/CD pipeline. Begin today and build towards comprehensive payment security automation.