PCI DSS DevSecOps: Automatización del Cumplimiento de Seguridad de Pagos en Entornos de Nube

PCI DSS DevSecOps: Automatización del Cumplimiento de Seguridad de Pagos en Entornos de Nube

El desafío de cumplimiento de PCI DSS en sistemas de pago nativos en la nube

Su organización procesa millones de transacciones de pago diariamente a través de microservicios nativos en la nube, funciones sin servidor y pasarelas de pago de terceros. Cada componente debe mantener el cumplimiento de PCI DSS mientras permite ciclos de desarrollo rápidos y despliegue continuo. La validación manual de cumplimiento crea cuellos de botella en el despliegue, aumenta los riesgos de seguridad y puede resultar en costosos fallos de cumplimiento que amenazan su capacidad para procesar pagos.

La automatización de DevSecOps de PCI DSS transforma la seguridad de los pagos de una barrera de despliegue en una capacidad de seguridad integrada, proporcionando validación continua de cumplimiento, controles de seguridad automatizados y monitoreo de riesgos en tiempo real a lo largo de su canal de desarrollo y despliegue.

PCI DSS en arquitectura nativa en la nube

PCI DSS (Estándar de Seguridad de Datos de la Industria de Tarjetas de Pago) requiere controles de seguridad integrales en todos los sistemas que almacenan, procesan o transmiten datos de titulares de tarjetas. En entornos nativos de la nube, esto significa implementar seguridad en cada capa mientras se mantiene la agilidad y escalabilidad que las aplicaciones modernas demandan.

Requisitos de PCI DSS para Entornos en la Nube

Requisitos Básicos de Cumplimiento:

RequisitoDescripciónImplementación en la NubeEnfoque de Automatización
Req 1-2Seguridad de RedAislamiento de VPC, WAF, NACLsValidación de Infraestructura como Código
Req 3Protección de Datos del TarjetahabienteCifrado en reposo/transmisiónVerificación automatizada de cifrado
Req 4Transmisión SeguraTLS 1.2+, gestión de certificadosAutomatización del ciclo de vida de certificados
Req 6Desarrollo SeguroSAST, DAST, escaneo de dependenciasPuertas de seguridad en CI/CD
Req 7Control de AccesoRBAC, privilegio mínimoRevisión automatizada de acceso
Req 8AutenticaciónMFA, contraseñas fuertesIntegración del proveedor de identidad
Req 10Registro y MonitoreoRegistro centralizado, SIEMAnálisis de registros en tiempo real
Req 11Pruebas de SeguridadEscaneo de vulnerabilidades, pruebas de penetraciónEscaneo de seguridad automatizado

Arquitectura PCI DSS Nativa en la Nube

1. Microservicio de Procesamiento de Pagos Seguro

# payment-security/secure_payment_service.py
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field, validator
from typing import Optional, Dict, List
from datetime import datetime, timedelta
import uuid
import hashlib
import cryptography.fernet
import asyncio
import logging
from enum import Enum
import re

app = FastAPI(
    title="Servicio de Pago Seguro",
    description="API de Procesamiento de Pagos Cumpliente con PCI DSS",
    version="2.0.0"
)

# Requisito 6 de PCI DSS: Desarrollo Seguro
class PaymentMethod(str, Enum):
    CREDIT_CARD = "credit_card"
    DEBIT_CARD = "debit_card"
    ACH = "ach"
    DIGITAL_WALLET = "digital_wallet"

class TransactionStatus(str, Enum):
    PENDING = "pendiente"
    AUTHORIZED = "autorizado"
    CAPTURED = "capturado"
    DECLINED = "rechazado"
    FAILED = "fallido"
    REFUNDED = "reembolsado"

class SecurityLevel(str, Enum):
    L1 = "nivel_1"  # > 6M transacciones/año
    L2 = "nivel_2"  # 1-6M transacciones/año
    L3 = "nivel_3"  # 20K-1M transacciones de comercio electrónico/año
    L4 = "nivel_4"  # < 20K transacciones de comercio electrónico/año

# PCI DSS Requisito 3: Protección de Datos del Titular de la Tarjeta
class TokenizedPaymentRequest(BaseModel):
    payment_token: str = Field(..., min_length=20, max_length=50)
    amount: float = Field(..., gt=0, le=999999.99)
    currency: str = Field(..., regex=r'^[A-Z]{3}$')
    merchant_id: str = Field(..., min_length=8, max_length=20)
    order_id: str = Field(..., min_length=6, max_length=50)
    payment_method: PaymentMethod
    customer_id: Optional[str] = Field(None, min_length=6, max_length=50)
    billing_address: Optional[Dict] = None
    metadata: Optional[Dict] = None

    @validator('payment_token')
    def validate_payment_token(cls, v):
        # Asegúrese de que el token no contenga datos reales de la tarjeta
        if re.search(r'\d{13,19}', v):
            raise ValueError('El token de pago no puede contener números de tarjeta')
        return v

class PaymentResponse(BaseModel):
    transaction_id: str
    status: TransactionStatus
    payment_method: PaymentMethod
    amount: float
    currency: str
    authorized_amount: Optional[float] = None
    authorization_code: Optional[str] = None
    reference_number: str
    timestamp: datetime
    merchant_id: str
    compliance_metadata: Dict

class PCILogger:
    """Requisito PCI DSS 10: Registro y Monitoreo"""

    def __init__(self):
        self.logger = logging.getLogger("pci_audit")
        self.logger.setLevel(logging.INFO)

        # Configurar manejador de registro seguro
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            '%(asctime)s | %(name)s | %(levelname)s | %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)

    def log_payment_event(self, event_type: str, transaction_id: str,
                         user_id: str, details: Dict):
        """Registrar eventos de pago para el cumplimiento de PCI DSS"""

        # PCI DSS 10.2: Registrar todo acceso a datos de tarjetas de pago
        log_entry = {
            'event_type': event_type,
            'transaction_id': transaction_id,
            'user_id': user_id,
            'timestamp': datetime.now().isoformat(),
            'source_ip': details.get('source_ip', 'unknown'),
            'user_agent': details.get('user_agent', 'unknown'),
            'success': details.get('success', False),
            'amount': details.get('amount'),
            'currency': details.get('currency'),
            'merchant_id': details.get('merchant_id'),
            'compliance_level': details.get('compliance_level', 'unknown')
        }

# Asegurar que no haya datos sensibles en los registros (PCI DSS 3.4)
if 'card_number' in str(details):
    self.logger.error(f"VIOLACIÓN DE SEGURIDAD: Intento de registro de datos de tarjeta - {transaction_id}")
    raise SecurityException("Los datos de la tarjeta no pueden ser registrados")

self.logger.info(f"EVENTO_DE_PAGO: {log_entry}")

def log_access_attempt(self, endpoint: str, user_id: str, success: bool,
                      source_ip: str, details: Dict = None):
    """Registrar intentos de acceso para monitoreo de seguridad"""

    access_log = {
        'event_type': 'intento_de_acceso',
        'endpoint': endpoint,
        'user_id': user_id,
        'success': success,
        'source_ip': source_ip,
        'timestamp': datetime.now().isoformat(),
        'details': details or {}
    }

    self.logger.info(f"REGISTRO_DE_ACCESO: {access_log}")

class PCIDataProtection:
    """Requisito PCI DSS 3: Proteger los datos almacenados del titular de la tarjeta"""

    def __init__(self, encryption_key: str):
        self.fernet = cryptography.fernet.Fernet(encryption_key.encode())
        self.token_prefix = "tok_"

    def tokenize_sensitive_data(self, sensitive_data: str) -> str:
        """Tokenizar datos de pago sensibles"""

        # Generar token seguro
        token_data = {
            'original_hash': hashlib.sha256(sensitive_data.encode()).hexdigest(),
            'timestamp': datetime.now().isoformat(),
            'token_id': str(uuid.uuid4())
        }

        # Encriptar y almacenar el mapeo del token
        encrypted_mapping = self.fernet.encrypt(sensitive_data.encode())
        token = f"{self.token_prefix}{token_data['token_id']}"

        # En producción, almacenar encrypted_mapping en una bóveda de tokens segura
        # con gestión adecuada de claves (HSM o KMS en la nube)

        return token

    def detokenize_data(self, token: str) -> str:
        """Detokenizar datos para procesamiento (acceso restringido)"""

        if not token.startswith(self.token_prefix):
            raise ValueError("Formato de token inválido")

        # En producción, recuperar desde un almacén de tokens seguro
        # Esta operación debe ser registrada y monitoreada exhaustivamente
        # El acceso al almacén de tokens debe requerir autenticación adicional

        # Simular búsqueda en el almacén de tokens
        return "marcador_de_posición_datos_desetokenizados"

    def encrypt_for_storage(self, data: str) -> str:
        """Encriptar datos para almacenamiento seguro"""
        return self.fernet.encrypt(data.encode()).decode()

    def decrypt_from_storage(self, encrypted_data: str) -> str:
        """Desencriptar datos del almacenamiento"""
        return self.fernet.decrypt(encrypted_data.encode()).decode()

    def validate_no_cardholder_data(self, data: Dict) -> bool:
        """Validar que los datos no contengan datos prohibidos del titular de la tarjeta"""

        data_str = str(data).lower()

        # Verificar posibles números de tarjeta (regex básico)
        patrones_tarjeta = [
            r'\b4\d{15}\b',           # Visa
            r'\b5[1-5]\d{14}\b',      # Mastercard
            r'\b3[47]\d{13}\b',       # American Express
            r'\b6(?:011|5\d{2})\d{12}\b'  # Discover
        ]

        for patron in patrones_tarjeta:
            if re.search(patron, data_str):
                return False

        # Verificar patrones de CVV
        if re.search(r'\b\d{3,4}\b', data_str) and ('cvv' in data_str or 'cvc' in data_str):
            return False

        return True

class ControlAccesoPCI:
    """Requisito PCI DSS 7-8: Control de Acceso y Autenticación"""

    def __init__(self):
        self.authorized_roles = {
            'procesador_de_pago': ['procesar_pago', 'ver_transacción'],
            'administrador_de_pago': ['procesar_pago', 'ver_transacción', 'reembolsar_pago'],
            'administrador_de_seguridad': ['ver_registros', 'gestionar_acceso'],
            'auditor_de_cumplimiento': ['ver_registros', 'ver_transacción', 'exportar_auditoría']
        }

        self.politicas_de_seguridad = {
            'max_intentos_fallidos': 3,
            'duración_bloqueo_cuenta': 30,  # minutos
            'longitud_minima_contraseña': 12,
            'mfa_requerido': True,
            'tiempo_de_espera_sesión': 15  # minutos
        }

    def validar_permisos_usuario(self, id_usuario: str, permiso_requerido: str) -> bool:
        """Validar que el usuario tenga los permisos requeridos"""

        # En producción, integrar con proveedor de identidad (OAuth2, SAML)
        roles_usuario = self.obtener_roles_usuario(id_usuario)

        for role in user_roles:
            if required_permission in self.authorized_roles.get(role, []):
                return True

        return False

    def get_user_roles(self, user_id: str) -> List[str]:
        """Obtener roles de usuario del proveedor de identidad"""
        # Simular búsqueda de roles
        return ['procesador_de_pagos']

    def enforce_least_privilege(self, user_id: str, requested_action: str) -> bool:
        """Aplicar el principio de acceso de menor privilegio"""

        # Verificar si el usuario tiene los permisos mínimos requeridos
        return self.validate_user_permissions(user_id, requested_action)

    def log_privileged_access(self, user_id: str, action: str, resource: str):
        """Registrar acceso privilegiado para cumplimiento"""

        access_log = {
            'user_id': user_id,
            'action': action,
            'resource': resource,
            'timestamp': datetime.now().isoformat(),
            'privilege_level': 'alto'
        }

        # Enviar al sistema de monitoreo de seguridad
        logging.getLogger("privileged_access").info(access_log)

class PaymentSecurityService:
    """Servicio principal de procesamiento de pagos con cumplimiento PCI DSS"""

    def __init__(self):
        self.logger = PCILogger()
        self.data_protection = PCIDataProtection("clave_de_encriptación_del_hsm")
        self.access_control = PCIAccessControl()

        # Requisito PCI DSS 11: Pruebas de seguridad regulares
        self.security_scan_last_run = datetime.now() - timedelta(days=7)
        self.compliance_level = SecurityLevel.L1

    async def process_payment(self, payment_request: TokenizedPaymentRequest,
                            user_id: str, source_ip: str, user_agent: str) -> PaymentResponse:
        """Procesar pago con cumplimiento completo de PCI DSS"""

        transaction_id = str(uuid.uuid4())

```python
try:
    # Requisito 7 de PCI DSS: Control de Acceso
    if not self.access_control.validate_user_permissions(user_id, 'process_payment'):
        raise HTTPException(status_code=403, detail="Permisos insuficientes")

    # Requisito 3 de PCI DSS: Validación de Protección de Datos
    if not self.data_protection.validate_no_cardholder_data(payment_request.dict()):
        self.logger.log_payment_event(
            "security_violation", transaction_id, user_id,
            {"error": "cardholder_data_detected", "source_ip": source_ip}
        )
        raise HTTPException(status_code=400, detail="Formato de datos inválido")

Requisito 10 de PCI DSS: Registro de Auditoría

self.logger.log_payment_event( “payment_initiated”, transaction_id, user_id, { “amount”: payment_request.amount, “currency”: payment_request.currency, “merchant_id”: payment_request.merchant_id, “source_ip”: source_ip, “user_agent”: user_agent, “success”: True } )

Procesar pago a través de pasarela segura

payment_result = await self._process_payment_gateway(payment_request, transaction_id)

Crear respuesta conforme

response = PaymentResponse( transaction_id=transaction_id, status=payment_result[‘status’], payment_method=payment_request.payment_method, amount=payment_request.amount, currency=payment_request.currency, authorized_amount=payment_result.get(‘authorized_amount’), authorization_code=payment_result.get(‘auth_code’), reference_number=payment_result[‘reference’], timestamp=datetime.now(), merchant_id=payment_request.merchant_id, compliance_metadata={ ‘pci_dss_version’: ‘4.0’, ‘compliance_level’: self.compliance_level.value, ‘security_scan_status’: ‘current’, ‘encryption_standard’: ‘AES-256-GCM’ } )

Registrar transacción exitosa

self.logger.log_payment_event( “pago_completado”, transaction_id, user_id, { “estado”: payment_result[‘status’], “monto”: payment_request.amount, “moneda”: payment_request.currency, “id_comerciante”: payment_request.merchant_id, “ip_origen”: source_ip, “éxito”: True } )

return response

except Exception as e: # Registrar transacción fallida self.logger.log_payment_event( “payment_failed”, transaction_id, user_id, { “error”: str(e), “amount”: payment_request.amount, “currency”: payment_request.currency, “merchant_id”: payment_request.merchant_id, “source_ip”: source_ip, “success”: False } ) raise

async def _process_payment_gateway(self, payment_request: TokenizedPaymentRequest, transaction_id: str) -> Dict: """Procesar pago a través de una pasarela segura"""

# Simular procesamiento de pasarela de pago segura
# En producción, integrar con procesador de pagos compatible con PCI DSS

await asyncio.sleep(0.5)  # Simular tiempo de procesamiento

    return {
        'status': TransactionStatus.AUTHORIZED,
        'authorized_amount': payment_request.amount,
        'auth_code': f"AUTH{str(uuid.uuid4())[:8].upper()}",
        'reference': f"REF{str(uuid.uuid4())[:12].upper()}"
    }

async def validate_compliance_status(self) -> Dict:
    """Validar el estado actual de cumplimiento PCI DSS"""

    compliance_checks = {
        'network_security': await self._check_network_security(),
        'data_encryption': await self._check_data_encryption(),
        'access_controls': await self._check_access_controls(),
        'monitoring': await self._check_monitoring_status(),
        'security_testing': await self._check_security_testing(),
        'vulnerability_management': await self._check_vulnerability_status()
    }

Calcular la puntuación general de cumplimiento

passed_checks = sum(1 for check in compliance_checks.values() if check[‘status’] == ‘compliant’) total_checks = len(compliance_checks) compliance_score = (passed_checks / total_checks) * 100

return { ‘compliance_score’: compliance_score, ‘compliance_level’: self.compliance_level.value, ‘last_assessment’: datetime.now().isoformat(), ‘detailed_checks’: compliance_checks, ‘next_assessment_due’: (datetime.now() + timedelta(days=90)).isoformat(), ‘certification_status’: ‘active’ if compliance_score >= 95 else ‘requires_attention’ }

async def _check_network_security(self) -> Dict: """Verificar el Requisito PCI DSS 1-2: Seguridad de la Red""" return { ‘requirement’: ‘Seguridad de la Red (Req 1-2)’, ‘status’: ‘cumple’, ‘details’: { ‘firewall_configured’: True, ‘default_passwords_changed’: True, ‘network_segmentation’: True, ‘wireless_encryption’: True } }

async def _check_data_encryption(self) -> Dict: """Verificar el Requisito PCI DSS 3-4: Protección de Datos""" return { ‘requirement’: ‘Protección de Datos (Req 3-4)’, ‘status’: ‘cumple’, ‘details’: { ‘cardholder_data_encrypted’: True, ‘encryption_keys_protected’: True, ‘transmission_encrypted’: True, ‘key_rotation_current’: True } }

async def _check_access_controls(self) -> Dict: """Verificar el Requisito 7-8 de PCI DSS: Control de Acceso""" return { ‘requirement’: ‘Control de Acceso (Req 7-8)’, ‘status’: ‘cumple’, ‘details’: { ‘role_based_access’: True, ‘unique_user_ids’: True, ‘multi_factor_auth’: True, ‘password_policy_enforced’: True } }

async def _check_monitoring_status(self) -> Dict: """Verificar el Requisito 10 de PCI DSS: Monitoreo""" return { ‘requirement’: ‘Monitoreo (Req 10)’, ‘status’: ‘cumple’, ‘details’: { ‘audit_logging_enabled’: True, ‘log_monitoring_active’: True, ‘time_synchronization’: True, ‘log_integrity_protected’: True } }

async def _check_security_testing(self) -> Dict: """Verificar el Requisito 11 de PCI DSS: Pruebas de Seguridad""" days_since_scan = (datetime.now() - self.security_scan_last_run).days

return {
    'requirement': 'Pruebas de Seguridad (Req 11)',
    'status': 'cumple' if days_since_scan <= 90 else 'no_cumple',
    'details': {
        'vulnerability_scan_current': days_since_scan <= 90,
        'penetration_test_current': True,
        'network_intrusion_detection': True,
        'file_integrity_monitoring': True,
        'days_since_last_scan': days_since_scan
    }
}

async def _check_vulnerability_status(self) -> Dict:
    """Verificar el Requisito 6 de PCI DSS: Gestión de Vulnerabilidades"""
    return {
        'requirement': 'Gestión de Vulnerabilidades (Req 6)',
        'status': 'cumple',
        'details': {
            'security_patches_current': True,
            'secure_development_practices': True,
            'application_security_testing': True,
            'vulnerability_scanning_regular': True
        }
    }

Inicializar servicios

payment_service = PaymentSecurityService() security = HTTPBearer()

Puntos finales de API compatibles con PCI DSS

@app.post(“/payment/process”, response_model=PaymentResponse) async def process_payment( payment_request: TokenizedPaymentRequest, request: Request, credentials: HTTPAuthorizationCredentials = Depends(security) ): """Procesar pago con cumplimiento total de PCI DSS"""

Extraer metadatos de la solicitud

source_ip = request.client.host user_agent = request.headers.get(“user-agent”, “unknown”) user_id = “authenticated_user” # Extraer del token JWT en producción

return await payment_service.process_payment( payment_request, user_id, source_ip, user_agent )

@app.get(“/compliance/status”) async def get_compliance_status(credentials: HTTPAuthorizationCredentials = Depends(security)): """Obtener el estado actual de cumplimiento PCI DSS""" return await payment_service.validate_compliance_status()

@app.get(“/health/security”) async def security_health_check(): """Chequeo de salud enfocado en la seguridad para el sistema de pagos""" return { ‘status’: ‘saludable’, ‘timestamp’: datetime.now().isoformat(), ‘security_features’: { ‘encryption_enabled’: True, ‘authentication_required’: True, ‘audit_logging_active’: True, ‘monitoring_enabled’: True }, ‘compliance’: { ‘pci_dss_version’: ‘4.0’, ‘last_validation’: datetime.now().isoformat() } }

if name == “main”: import uvicorn uvicorn.run(app, host=“0.0.0.0”, port=8000, ssl_keyfile=“key.pem”, ssl_certfile=“cert.pem”)


**2. Validación Automatizada de Cumplimiento PCI DSS**

```python
# pci-compliance/automated_compliance_validator.py
import boto3
import json
import subprocess
import requests
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
import yaml
import logging

@dataclass
class ComplianceViolation:
    requirement_id: str
    severity: str
    description: str
    resource: str
    remediation: str
    detected_at: datetime

class PCIDSSComplianceValidator:
    """Validación automatizada de cumplimiento PCI DSS para infraestructura en la nube"""

    def __init__(self, aws_profile: str = None):
        self.session = boto3.Session(profile_name=aws_profile)
        self.ec2 = self.session.client('ec2')
        self.elbv2 = self.session.client('elbv2')
        self.rds = self.session.client('rds')
        self.cloudtrail = self.session.client('cloudtrail')
        self.config = self.session.client('config')
        self.inspector = self.session.client('inspector2')
    self.logger = logging.getLogger(__name__)
    self.violations = []

    # Reglas de cumplimiento PCI DSS
    self.compliance_rules = {
        'req_1_2': self._validate_network_security,
        'req_3': self._validate_data_protection,
        'req_4': self._validate_secure_transmission,
        'req_6': self._validate_secure_development,
        'req_7_8': self._validate_access_controls,
        'req_10': self._validate_logging_monitoring,
        'req_11': self._validate_security_testing
    }

async def run_comprehensive_compliance_scan(self) -> Dict:
    """Ejecutar escaneo integral de cumplimiento PCI DSS"""

    scan_results = {
        'scan_id': f"pci_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
        'scan_timestamp': datetime.now().isoformat(),
        'scope': 'infraestructura_completa',
        'compliance_requirements': {},
        'violations': [],
        'overall_score': 0,
        'certification_status': 'desconocido'
    }

    # Ejecutar todas las verificaciones de cumplimiento
    for req_id, validator_func in self.compliance_rules.items():
        self.logger.info(f"Ejecutando verificación de cumplimiento: {req_id}")

        try:
            requirement_result = await validator_func()
            scan_results['compliance_requirements'][req_id] = requirement_result

            # Recopilar violaciones
            if not requirement_result['compliant']:
                scan_results['violations'].extend(requirement_result['violations'])

except Exception as e: self.logger.error(f”Error al validar {req_id}: {str(e)}”) scan_results[‘compliance_requirements’][req_id] = { ‘compliant’: False, ‘error’: str(e), ‘violations’: [] }

Calcular el puntaje general de cumplimiento

compliant_requirements = sum( 1 for req in scan_results[‘compliance_requirements’].values() if req.get(‘compliant’, False) ) total_requirements = len(scan_results[‘compliance_requirements’]) scan_results[‘overall_score’] = (compliant_requirements / total_requirements) * 100

Determinar el estado de certificación

if scan_results[‘overall_score’] >= 95: scan_results[‘certification_status’] = ‘cumple’ elif scan_results[‘overall_score’] >= 85: scan_results[‘certification_status’] = ‘mayormente_cumple’ else: scan_results[‘certification_status’] = ‘no_cumple’

return scan_results

async def _validate_network_security(self) -> Dict: """Validar Requisitos PCI DSS 1-2: Seguridad de la Red"""

violaciones = []
verificaciones = {
    'vpc_isolation': False,
    'security_groups_configured': False,
    'nacl_configured': False,
    'waf_enabled': False,
    'default_passwords_changed': False
}

# Verificar la configuración de VPC
vpcs = self.ec2.describe_vpcs()['Vpcs']
if vpcs:
    verificaciones['vpc_isolation'] = True

Verificar VPCs dedicadas para el procesamiento de pagos

payment_vpcs = [ vpc for vpc in vpcs if ‘payment’ in vpc.get(‘Tags’, [{}])[0].get(‘Value’, ”).lower() ]

if not payment_vpcs: violations.append(ComplianceViolation( requirement_id=‘1.1.4’, severity=‘high’, description=‘No se encontró VPC dedicada para el procesamiento de pagos’, resource=‘vpc_configuration’, remediation=‘Crear VPC dedicada para el entorno de datos de tarjetas de pago’, detected_at=datetime.now() ))

Verificar Grupos de Seguridad

security_groups = self.ec2.describe_security_groups()[‘SecurityGroups’] restrictive_sgs = []

    for sg in grupos_de_seguridad:
        # Verificar reglas demasiado permisivas
        for regla in sg.get('IpPermissions', []):
            for rango_ip in regla.get('IpRanges', []):
                if rango_ip.get('CidrIp') == '0.0.0.0/0':
                    violaciones.append(ComplianceViolation(
                        requirement_id='1.3.1',
                        severity='critical',
                        description=f'El grupo de seguridad {sg["GroupId"]} permite acceso sin restricciones',
                        resource=sg['GroupId'],
                        remediation='Restringir las reglas del grupo de seguridad al acceso mínimo requerido',
                        detected_at=datetime.now()
                    ))
                else:
                    grupos_seguridad_restrictivos.append(sg['GroupId'])

    checks['grupos_seguridad_configurados'] = len(grupos_seguridad_restrictivos) > 0

    # Verificar configuración de WAF
    try:
        load_balancers = self.elbv2.describe_load_balancers()['LoadBalancers']
        waf_protected_albs = []
for alb in load_balancers:
    # Verificar asociación con WAF
    try:
        waf_info = self.elbv2.describe_load_balancer_attributes(
            LoadBalancerArn=alb['LoadBalancerArn']
        )
        # En la implementación real, verificar la asociación real con WAF
        waf_protected_albs.append(alb['LoadBalancerArn'])
    except:
        violations.append(ComplianceViolation(
            requirement_id='1.3.4',
            severity='medium',
            description=f'El balanceador de carga {alb["LoadBalancerName"]} no está protegido por WAF',
            resource=alb['LoadBalancerArn'],
            remediation='Asociar WAF con el balanceador de carga',
            detected_at=datetime.now()
        ))

checks[‘waf_enabled’] = len(waf_protected_albs) > 0 except Exception as e: self.logger.warning(f”No se pudo validar la configuración de WAF: {str(e)}“)

Determinar el cumplimiento general de la seguridad de la red

compliant = len(violations) == 0 and all(checks.values())

return { ‘requirement’: ‘Seguridad de la Red (Req 1-2)’, ‘compliant’: compliant, ‘checks’: checks, ‘violations’: [vars(v) for v in violations], ‘score’: sum(checks.values()) / len(checks) * 100 }

async def _validate_data_protection(self) -> Dict: """Validar el Requisito 3 de PCI DSS: Protección de Datos"""

violations = []
checks = {
    'rds_encryption_enabled': False,
    'ebs_encryption_enabled': False,
    's3_encryption_enabled': False,
    'kms_key_rotation': False,
    'no_cardholder_data_storage': False
}

Verificar cifrado de RDS

try: rds_instances = self.rds.describe_db_instances()[‘DBInstances’] encrypted_instances = [ db for db in rds_instances if db.get(‘StorageEncrypted’, False) ]

checks['rds_encryption_enabled'] = len(encrypted_instances) == len(rds_instances)
for db in rds_instances:
    if not db.get('StorageEncrypted', False):
        violations.append(ComplianceViolation(
            requirement_id='3.4',
            severity='critical',
            description=f'La instancia de RDS {db["DBInstanceIdentifier"]} no está cifrada',
            resource=db['DBInstanceIdentifier'],
            remediation='Habilitar cifrado en reposo para la instancia de RDS',
            detected_at=datetime.now()
        ))
except Exception as e:
    self.logger.warning(f"No se pudo validar el cifrado de RDS: {str(e)}")

# Verificar cifrado de EBS
try:
    volumes = self.ec2.describe_volumes()['Volumes']
    encrypted_volumes = [vol for vol in volumes if vol.get('Encrypted', False)]

    checks['ebs_encryption_enabled'] = len(encrypted_volumes) == len(volumes)
        for vol in volumes:
            if not vol.get('Encrypted', False):
                violations.append(ComplianceViolation(
                    requirement_id='3.4',
                    severity='high',
                    description=f'El volumen EBS {vol["VolumeId"]} no está cifrado',
                    resource=vol['VolumeId'],
                    remediation='Habilitar cifrado para el volumen EBS',
                    detected_at=datetime.now()
                ))
    except Exception as e:
        self.logger.warning(f"No se pudo validar el cifrado de EBS: {str(e)}")

    # Verificar rotación de clave KMS
    try:
        kms = self.session.client('kms')
        keys = kms.list_keys()['Keys']

rotation_enabled_count = 0 for key in keys[:10]: # Verificar los primeros 10 claves try: rotation_status = kms.get_key_rotation_status(KeyId=key[‘KeyId’]) if rotation_status.get(‘KeyRotationEnabled’, False): rotation_enabled_count += 1 except: pass # La clave podría no soportar rotación

checks[‘kms_key_rotation’] = rotation_enabled_count > 0 except Exception as e: self.logger.warning(f”No se pudo validar la rotación de la clave KMS: {str(e)}“)

Simular escaneo de datos del titular de la tarjeta

checks[‘no_cardholder_data_storage’] = await self._scan_for_cardholder_data()

compliant = len(violations) == 0 and all(checks.values())

    return {
        'requisito': 'Protección de Datos (Req 3)',
        'cumple': cumple,
        'verificaciones': verificaciones,
        'violaciones': [vars(v) for v in violaciones],
        'puntuación': sum(verificaciones.values()) / len(verificaciones) * 100
    }

async def _validar_transmisión_segura(self) -> Dict:
    """Validar el Requisito 4 de PCI DSS: Transmisión Segura"""

    violaciones = []
    verificaciones = {
        'tls_1_2_mínimo': False,
        'gestión_de_certificados': False,
        'criptografía_fuerte': False,
        'encriptación_inalámbrica': False
    }

    # Verificar la configuración SSL/TLS del balanceador de carga
    try:
        balanceadores_de_carga = self.elbv2.describe_load_balancers()['LoadBalancers']

        for alb in balanceadores_de_carga:
            oyentes = self.elbv2.describe_listeners(
                LoadBalancerArn=alb['LoadBalancerArn']
            )['Listeners']

ssl_listeners = [l for l in listeners if l[‘Protocol’] in [‘HTTPS’, ‘TLS’]]

for listener in ssl_listeners: # Verificar política SSL ssl_policy = listener.get(‘SslPolicy’, ”)

if 'TLSv1.2' not in ssl_policy:
    violations.append(ComplianceViolation(
        requirement_id='4.1',
        severity='high',
        description=f'El balanceador de carga utiliza una política SSL débil: {ssl_policy}',
        resource=alb['LoadBalancerArn'],
        remediation='Actualizar la política SSL para requerir TLS 1.2 como mínimo',
        detected_at=datetime.now()
    ))
else:
    checks['tls_1_2_minimum'] = True

Verificar oyentes HTTP (deberían redirigir a HTTPS)

http_listeners = [l for l in listeners if l[‘Protocol’] == ‘HTTP’] for listener in http_listeners: # Verificar si HTTP redirige a HTTPS default_actions = listener.get(‘DefaultActions’, []) redirect_actions = [ a for a in default_actions if a.get(‘Type’) == ‘redirect’ and a.get(‘RedirectConfig’, {}).get(‘Protocol’) == ‘HTTPS’ ]

                if not redirect_actions:
                    violations.append(ComplianceViolation(
                        requirement_id='4.1',
                        severity='medium',
                        description=f'El oyente HTTP no redirige a HTTPS',
                        resource=listener['ListenerArn'],
                        remediation='Configurar redirección de HTTP a HTTPS',
                        detected_at=datetime.now()
                    ))

    except Exception as e:
        self.logger.warning(f"No se pudo validar la configuración SSL del balanceador de carga: {str(e)}")

    # Verificar gestión de certificados
    try:
        acm = self.session.client('acm')
        certificates = acm.list_certificates()['CertificateSummaryList']
valid_certificates = 0
for cert in certificates:
    cert_details = acm.describe_certificate(CertificateArn=cert['CertificateArn'])
    certificate = cert_details['Certificate']

Verificar el estado y la expiración del certificado

if certificate[‘Status’] == ‘ISSUED’: not_after = certificate.get(‘NotAfter’) if not_after and not_after > datetime.now(not_after.tzinfo): valid_certificates += 1 else: violations.append(ComplianceViolation( requirement_id=‘4.1’, severity=‘high’, description=f’El certificado {cert[“CertificateArn”]} ha expirado o está por expirar pronto’, resource=cert[‘CertificateArn’], remediation=‘Renovar el certificado SSL’, detected_at=datetime.now() ))

checks[‘certificate_management’] = valid_certificates > 0

except Exception as e: self.logger.warning(f”No se pudo validar la gestión de certificados: {str(e)}”)

compliant = len(violations) == 0 and any(checks.values())

return {
    'requirement': 'Transmisión Segura (Req 4)',
    'compliant': compliant,
    'checks': checks,
    'violations': [vars(v) for v in violations],
    'score': sum(checks.values()) / len(checks) * 100
}

async def _validate_logging_monitoring(self) -> Dict:
    """Validar Requisito 10 de PCI DSS: Registro y Monitoreo"""

    violations = []
    checks = {
        'cloudtrail_enabled': False,
        'vpc_flow_logs': False,
        'application_logging': False,
        'log_integrity': False,
        'centralized_logging': False
    }

    # Verificar configuración de CloudTrail
    try:
        trails = self.cloudtrail.describe_trails()['trailList']
        active_trails = []

        for trail in trails:
            trail_status = self.cloudtrail.get_trail_status(Name=trail['TrailARN'])
            si trail_status.get('IsLogging', False):
                active_trails.append(trail)

                # Verificar si el sendero registra eventos de datos
                event_selectors = self.cloudtrail.get_event_selectors(
                    TrailName=trail['TrailARN']
                )

                has_data_events = any(
                    selector.get('ReadWriteType') == 'All'
                    for selector in event_selectors.get('EventSelectors', [])
                )

                if not has_data_events:
                    violations.append(ComplianceViolation(
                        requirement_id='10.2',
                        severity='medium',
                        description=f'CloudTrail {trail["Name"]} no está registrando eventos de datos',
                        resource=trail['TrailARN'],
                        remediation='Configurar CloudTrail para registrar eventos de datos',
                        detected_at=datetime.now()
                    ))
            else:
                violations.append(ComplianceViolation(
                    requirement_id='10.1',
                    severity='high',
                    description=f'CloudTrail {trail["Name"]} no está registrando activamente',
                    resource=trail['TrailARN'],
                    remediation='Habilitar el registro de CloudTrail',
                    detected_at=datetime.now()
                ))

checks[‘cloudtrail_enabled’] = len(active_trails) > 0

except Exception as e: self.logger.warning(f”No se pudo validar CloudTrail: {str(e)}“)

Verificar registros de flujo de VPC

try: vpcs = self.ec2.describe_vpcs()[‘Vpcs’]

for vpc in vpcs:
    flow_logs = self.ec2.describe_flow_logs(
        Filters=[
            {'Name': 'resource-id', 'Values': [vpc['VpcId']]}
        ]
    )['FlowLogs']

    active_flow_logs = [
        fl for fl in flow_logs
        if fl.get('FlowLogStatus') == 'ACTIVE'
    ]

            if not active_flow_logs:
                violations.append(ComplianceViolation(
                    requirement_id='10.2',
                    severity='medium',
                    description=f'VPC {vpc["VpcId"]} sin registros de flujo',
                    resource=vpc['VpcId'],
                    remediation='Habilitar registros de flujo de VPC',
                    detected_at=datetime.now()
                ))
            else:
                checks['vpc_flow_logs'] = True

    except Exception as e:
        self.logger.warning(f"No se pudo validar los registros de flujo de VPC: {str(e)}")

    compliant = len(violations) == 0 and sum(checks.values()) >= 3

    return {
        'requirement': 'Registro y Monitoreo (Req 10)',
        'compliant': compliant,
        'checks': checks,
        'violations': [vars(v) for v in violations],
        'score': sum(checks.values()) / len(checks) * 100
    }

async def _scan_for_cardholder_data(self) -> bool:
    """Escanear almacenamiento prohibido de datos del titular de la tarjeta"""

    # Simular escaneo de datos del titular de la tarjeta
    # En producción, esto usaría herramientas DLP como Amazon Macie

    self.logger.info("Escaneando datos del titular de la tarjeta...")

    # Simular resultados del escaneo
    return True  # No se encontraron datos del titular de la tarjeta

def generate_compliance_report(self, scan_results: Dict) -> str:
    """Generar informe completo de cumplimiento PCI DSS"""

    report = f"""

Informe de Cumplimiento PCI DSS

ID de Escaneo: {scan_results[‘scan_id’]} Fecha de Escaneo: {scan_results[‘scan_timestamp’]} Puntuación General: {scan_results[‘overall_score’]:.1f}% Estado de Certificación: {scan_results[‘certification_status’].upper()}

Resumen Ejecutivo

Este informe proporciona una evaluación completa del cumplimiento PCI DSS en todos los sistemas y aplicaciones dentro del alcance.

Desglose de Puntuación de Cumplimiento

"""

    for req_id, result in scan_results['compliance_requirements'].items():
        status_icon = "✅" if result.get('compliant', False) else "❌"
        report += f"- **{result.get('requirement', req_id)}**: {status_icon} {result.get('score', 0):.1f}%\n"

    report += f"""

Resumen de Violaciones

Total de Violaciones: {len(scan_results[‘violations’])}

Violaciones Críticas

"""

    critical_violations = [v for v in scan_results['violations'] if v.get('severity') == 'critical']
    for violation in critical_violations:
        report += f"""

{violation[‘requirement_id’]}: {violation[‘description’]}

  • Recurso: {violation[‘resource’]}

  • Remediación: {violation[‘remediation’]}

  • Detectado: {violation[‘detected_at’]} """

      report += f"""

Violaciones de Alta Prioridad

"""

    high_violations = [v for v in scan_results['violations'] if v.get('severity') == 'high']
    for violation in high_violations:
        report += f"""

{violation[‘requirement_id’]}: {violation[‘description’]}

  • Recurso: {violation[‘resource’]}

  • Remediación: {violation[‘remediation’]} """

      report += f"""

Recomendaciones

  1. Acciones inmediatas requeridas:

    • Abordar todas las violaciones críticas dentro de 24 horas
    • Implementar controles de seguridad de emergencia para hallazgos de alto riesgo
  2. Acciones a corto plazo (1-30 días):

    • Remediar todas las violaciones de alta prioridad
    • Mejorar las capacidades de monitoreo y registro
    • Realizar pruebas de seguridad adicionales
  3. Acciones a largo plazo (30-90 días):

    • Implementar monitoreo de cumplimiento automatizado
    • Mejorar la capacitación en concienciación sobre seguridad
    • Planificar evaluaciones de cumplimiento trimestrales

Próximos pasos

"""

  • Próxima Evaluación: {(datetime.now() + timedelta(days=90)).strftime(‘%Y-%m-%d’)}
  • Revisión Trimestral: Requerida para comerciantes de Nivel 1
  • Monitoreo Continuo: Implementar validación de cumplimiento en tiempo real

Apéndice

Detalles del Marco de Cumplimiento

  • Versión PCI DSS: 4.0
  • Tipo de Evaluación: Cuestionario de Autoevaluación (SAQ) / Evaluación in situ
  • Alcance: Evaluación completa de la infraestructura
  • Nivel de Cumplimiento: Comerciante Nivel 1 (>6M transacciones/año)

"""

    return report

if name == “main”: import asyncio

async def main():
    validator = PCIDSSComplianceValidator()

    # Ejecutar escaneo de cumplimiento integral
    results = await validator.run_comprehensive_compliance_scan()

    # Generar y guardar informe
    report = validator.generate_compliance_report(results)

    with open('pci_dss_compliance_report.md', 'w') as f:
        f.write(report)

    with open('pci_dss_compliance_results.json', 'w') as f:
        json.dump(results, f, indent=2, default=str)

    print(f"¡Escaneo de cumplimiento completado!")
    print(f"Puntuación general: {results['overall_score']:.1f}%")
    print(f"Estado: {results['certification_status']}")
    print(f"Violaciones: {len(results['violations'])}")

asyncio.run(main())

## Cumplimiento Continuo en CI/CD

### Puertas de Seguridad PCI DSS

**1. Flujo de Trabajo de Validación PCI DSS en GitHub Actions**

```yaml
# .github/workflows/pci-dss-compliance.yml
name: Validación de Cumplimiento PCI DSS

on:
  push:
    branches: [main, develop]
    paths:
      - 'src/**'
      - 'infrastructure/**'
      - 'payment-services/**'
  pull_request:
    branches: [main]
    paths:
      - 'src/**'
      - 'infrastructure/**'
      - 'payment-services/**'
  schedule:
    - cron: '0 2 * * *' # Verificación diaria de cumplimiento
  workflow_dispatch:

env:
  PCI_COMPLIANCE_LEVEL: 'L1' # Nivel 1 Comerciante
  SECURITY_SCAN_REQUIRED: true

jobs:
  pci-compliance-gates:
    name: Puertas de Seguridad PCI DSS
    runs-on: ubuntu-latest
    permissions:
      contents: read
      security-events: write
      id-token: write

    steps:
      - name: Checkout Code
        uses: actions/checkout@v4
        with:
          fetch-depth: 0 # Historial completo para mejor escaneo

      # Requisito PCI DSS 6: Desarrollo Seguro
      - name: Pruebas de Seguridad de Aplicaciones Estáticas (SAST)
        uses: github/super-linter@v4
        env:
          DEFAULT_BRANCH: main
          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
          VALIDATE_PYTHON: true
          VALIDATE_JAVASCRIPT: true
          VALIDATE_TERRAFORM: true
          VALIDATE_YAML: true

      - name: Escaneo de Vulnerabilidades de Dependencias
        run: |
          # Instalar herramientas de escaneo de seguridad
          pip install safety bandit semgrep
          npm install -g audit-ci retire

# Escaneo de dependencias de Python
if [ -f requirements.txt ]; then
echo "Escaneando dependencias de Python..."
safety check -r requirements.txt --json --output safety-report.json || true

# Fallar en vulnerabilidades críticas
CRITICAL_VULNS=$(jq '.vulnerabilities | map(select(.severity == "critical")) | length' safety-report.json)
if [ "$CRITICAL_VULNS" -gt 0 ]; then
echo "❌ Se encontraron vulnerabilidades críticas en las dependencias de Python"
exit 1
fi
fi

# Escaneo de dependencias de JavaScript
if [ -f package.json ]; then
echo "Escaneando dependencias de JavaScript..."
npm audit --audit-level moderate
retire --path . --outputformat json --outputpath retire-report.json || true
fi

      - name: Detección de Secretos
        run: |
          # Instalar TruffleHog para la detección de secretos
          curl -sSfL https://raw.githubusercontent.com/trufflesecurity/trufflehog/main/scripts/install.sh | sh -s -- -b /usr/local/bin

          # Escanear en busca de secretos en el repositorio
          trufflehog git file://. --json --output trufflehog-results.json

          # Verificar si se encontraron secretos
          SECRET_COUNT=$(jq '. | length' trufflehog-results.json)
          if [ "$SECRET_COUNT" -gt 0 ]; then
            echo "❌ Se detectaron secretos en el repositorio"
            jq '.[].DetectorName' trufflehog-results.json
            exit 1
          fi

      - name: Validación de Datos de Pago
        run: |
          # Script personalizado para validar que no haya datos de titulares de tarjetas en el código
          python3 << 'EOF'
          import re
          import os
          import sys

def scan_for_cardholder_data(directorio):
    violaciones = []
    
    # Patrones para posibles datos del titular de la tarjeta
    patrones = {
        'tarjeta_credito': r'\b4\d{15}\b|\b5[1-5]\d{14}\b|\b3[47]\d{13}\b|\b6(?:011|5\d{2})\d{12}\b',
        'cvv': r'\b\d{3,4}\b.*(?:cvv|cvc|codigo.?seguridad)',
        'expiracion': r'\b(0[1-9]|1[0-2])\/\d{2,4}\b',
        'ssn': r'\b\d{3}-\d{2}-\d{4}\b'
    }
    
    for raiz, dirs, archivos in os.walk(directorio):
        # Omitir directorios comunes que no son de origen
        dirs[:] = [d for d in dirs if d not in ['.git', 'node_modules', '__pycache__', '.venv']]
        
        for archivo in archivos:
            if archivo.endswith(('.py', '.js', '.ts', '.java', '.go', '.rb', '.php')):
                ruta_archivo = os.path.join(raiz, archivo)
                try:
                    with open(ruta_archivo, 'r', encoding='utf-8', errors='ignore') as f:
                        contenido = f.read()
                        
                        for nombre_patron, patron in patrones.items():
                            coincidencias = re.findall(patron, contenido, re.IGNORECASE)
                            if coincidencias:
                                violaciones.append({
                                    'archivo': ruta_archivo,
                                    'patron': nombre_patron,
                                    'coincidencias': len(coincidencias)
                                })
                except Exception as e:
                    print(f"Advertencia: No se pudo escanear {ruta_archivo}: {e}")
    
    return violaciones

# Escanear código fuente
violaciones = escanear_datos_del_titular_de_la_tarjeta('.')

if violaciones:
    print("❌ Se encontraron posibles datos del titular de la tarjeta en el código fuente:")
    for violación in violaciones:
        print(f"  {violación['archivo']}: {violación['patrón']} ({violación['coincidencias']} coincidencias)")
    sys.exit(1)
else:
    print("✅ No se detectaron patrones de datos del titular de la tarjeta")
EOF

- name: Configurar Credenciales de AWS
  uses: aws-actions/configure-aws-credentials@v4
  with:
    role-to-assume: ${{ secrets.AWS_OIDC_ROLE }}
    aws-region: us-east-1

# Requisito PCI DSS 11: Pruebas de Seguridad
- name: Escaneo de Seguridad de Infraestructura
  run: |
    # Instalar escáneres de seguridad de Terraform
    curl -L "$(curl -s https://api.github.com/repos/aquasecurity/tfsec/releases/latest | grep -o -E "https://.+?tfsec-linux-amd64")" > tfsec
    chmod +x tfsec

          curl -L "$(curl -s https://api.github.com/repos/Checkmarx/kics/releases/latest | grep -o -E "https://.+?kics_.*_linux_x64.tar.gz")" > kics.tar.gz
          tar -xzf kics.tar.gz

          # Escanear configuraciones de Terraform
          if [ -d "infrastructure" ]; then
            echo "Escaneando configuraciones de Terraform..."
            ./tfsec infrastructure/ --format json --out tfsec-results.json
            ./kics scan -p infrastructure/ -o json -f kics-results.json --report-formats json
            
            # Verificar problemas de alta severidad
            HIGH_SEVERITY=$(jq '[.results[] | select(.severity == "HIGH" or .severity == "CRITICAL")] | length' tfsec-results.json)
            if [ "$HIGH_SEVERITY" -gt 0 ]; then
              echo "❌ Se encontraron problemas de seguridad de alta severidad en la infraestructura"
              jq '.results[] | select(.severity == "HIGH" or .severity == "CRITICAL") | .description' tfsec-results.json
              exit 1
            fi
          fi

      - name: Pruebas de Seguridad de Aplicaciones
        run: |
          # Instalar Bandit para escaneo de seguridad en Python
          bandit -r . -f json -o bandit-results.json || true

          # Instalar Semgrep para escaneo de seguridad en múltiples lenguajes
          python -m semgrep --config=auto --json --output=semgrep-results.json . || true

          # Verificar problemas de seguridad de alta confianza
          if [ -f bandit-results.json ]; then
            HIGH_CONFIDENCE=$(jq '[.results[] | select(.issue_confidence == "HIGH" and .issue_severity == "HIGH")] | length' bandit-results.json)
            if [ "$HIGH_CONFIDENCE" -gt 0 ]; then
              echo "❌ Se encontraron problemas de seguridad de alta confianza"
              jq '.results[] | select(.issue_confidence == "HIGH" and .issue_severity == "HIGH") | .test_name' bandit-results.json
              exit 1
            fi
          fi

      - name: Escaneo de Seguridad de Contenedores
        if: github.event_name == 'push' && github.ref == 'refs/heads/main'
        run: |
          # Instalar Trivy para escaneo de contenedores
          curl -sfL https://raw.githubusercontent.com/aquasecurity/trivy/main/contrib/install.sh | sh -s -- -b /usr/local/bin

          # Construir y escanear imágenes de Docker
          if [ -f Dockerfile ]; then
            echo "Construyendo y escaneando imagen de Docker..."
            docker build -t payment-service:latest .
            
            # Escanear en busca de vulnerabilidades
            trivy image --exit-code 1 --severity HIGH,CRITICAL --format json --output trivy-results.json payment-service:latest
          fi

      - name: Validación de Infraestructura PCI DSS
        run: |
          # Ejecutar verificación automatizada de cumplimiento PCI DSS
          python3 << 'EOF'
          import boto3
          import json

def validate_pci_compliance():
    # Inicializar clientes de AWS
    ec2 = boto3.client('ec2')
    elbv2 = boto3.client('elbv2')
    
    problemas_de_cumplimiento = []
    
    try:
        # Verificar Grupos de Seguridad para reglas demasiado permisivas
        sgs = ec2.describe_security_groups()['SecurityGroups']
        
        for sg in sgs:
            for regla in sg.get('IpPermissions', []):
                for rango_ip in regla.get('IpRanges', []):
                    if rango_ip.get('CidrIp') == '0.0.0.0/0':
                        problemas_de_cumplimiento.append({
                            'tipo': 'grupo_de_seguridad',
                            'recurso': sg['GroupId'],
                            'problema': 'Acceso sin restricciones',
                            'severidad': 'crítico'
                        })
        
        # Verificar Balanceadores de Carga para HTTPS
        albs = elbv2.describe_load_balancers()['LoadBalancers']
        
        for alb in albs:
            oyentes = elbv2.describe_listeners(LoadBalancerArn=alb['LoadBalancerArn'])['Listeners']
            
            oyentes_https = [l for l in oyentes if l['Protocol'] == 'HTTPS']
            oyentes_http = [l for l in oyentes if l['Protocol'] == 'HTTP']
            
            if oyentes_http and not oyentes_https:
                problemas_de_cumplimiento.append({
                    'tipo': 'balanceador_de_carga',
                    'recurso': alb['LoadBalancerName'],
                    'problema': 'HTTP sin redirección HTTPS',
                    'severidad': 'alta'
                })
        
    except Exception as e:
        print(f"Advertencia: No se pudo completar la validación de infraestructura: {e}")
    
    return problemas_de_cumplimiento

# Ejecutar validación de cumplimiento
issues = validate_pci_compliance()

# Guardar resultados
with open('pci-infrastructure-issues.json', 'w') as f:
    json.dump(issues, f, indent=2)

# Verificar problemas críticos
critical_issues = [i for i in issues if i['severity'] == 'critical']

if critical_issues:
    print("❌ Se encontraron problemas críticos de cumplimiento PCI DSS:")
    for issue in critical_issues:
        print(f"  {issue['type']}: {issue['resource']} - {issue['issue']}")
    exit(1)
else:
    print("✅ No se detectaron problemas críticos de cumplimiento PCI DSS")
EOF

- name: Subir artefactos de seguridad
  uses: actions/upload-artifact@v3
  if: always()
  with:
    name: resultados-escaner-seguridad
    path: |
      *-results.json
      *-report.json
    retention-days: 90 # Mantener para el registro de auditoría de cumplimiento

      - name: Resumen de Puertas de Seguridad
        if: always()
        run: |
          echo "## 🔒 Resumen de Puertas de Seguridad PCI DSS" >> $GITHUB_STEP_SUMMARY
          echo "| Verificación | Estado | Detalles |" >> $GITHUB_STEP_SUMMARY
          echo "|-------------|--------|---------|" >> $GITHUB_STEP_SUMMARY
          echo "| Análisis Estático | ✅ Aprobado | No se encontraron problemas críticos |" >> $GITHUB_STEP_SUMMARY
          echo "| Escaneo de Dependencias | ✅ Aprobado | No vulnerabilidades críticas |" >> $GITHUB_STEP_SUMMARY
          echo "| Detección de Secretos | ✅ Aprobado | No se detectaron secretos |" >> $GITHUB_STEP_SUMMARY
          echo "| Validación de Datos de Pago | ✅ Aprobado | No hay datos de titulares de tarjetas en el código |" >> $GITHUB_STEP_SUMMARY
          echo "| Seguridad de Infraestructura | ✅ Aprobado | Configuraciones de Terraform seguras |" >> $GITHUB_STEP_SUMMARY
          echo "| Cumplimiento PCI DSS | ✅ Aprobado | No hay problemas críticos de cumplimiento |" >> $GITHUB_STEP_SUMMARY

```shell
echo "" >> $GITHUB_STEP_SUMMARY
echo "**Nivel de Cumplimiento:** $PCI_COMPLIANCE_LEVEL" >> $GITHUB_STEP_SUMMARY
echo "**Fecha del Escaneo:** $(date -u)" >> $GITHUB_STEP_SUMMARY

deploy-to-production: name: Desplegar en el Entorno PCI DSS needs: pci-compliance-gates if: github.ref == ‘refs/heads/main’ && github.event_name == ‘push’ runs-on: ubuntu-latest environment: production

steps: - name: Desplegar con Controles PCI DSS run: | echo ”🚀 Desplegando en un entorno compatible con PCI DSS” echo ”✅ Todos los controles de seguridad superados” echo ”✅ Listo para el despliegue en producción”

    # Despliegue en producción con controles adicionales de PCI DSS
    # - Comunicación encriptada
    # - Registro de auditoría habilitado
    # - Segmentación de red aplicada
    # - Monitoreo activado

## Impacto Empresarial y ROI

### Análisis de ROI de la Automatización de PCI DSS

**Cumplimiento PCI DSS Manual vs. Automatizado:**

| Proceso                      | Enfoque Manual    | Enfoque Automatizado | Ahorro de Tiempo | Ahorro de Costos    |
| ---------------------------- | ------------------ | -------------------- | ---------------- | ------------------- |
| **Pruebas de Seguridad**     | 40 horas/trimestre | 2 horas/trimestre    | 95% reducción    | $114K anualmente    |
| **Gestión de Vulnerabilidades** | 60 horas/mes      | 8 horas/mes          | 87% reducción    | $156K anualmente    |
| **Informes de Cumplimiento** | 80 horas/trimestre | 8 horas/trimestre    | 90% reducción    | $216K anualmente    |
| **Revisiones de Seguridad de Código** | 160 horas/mes | 20 horas/mes        | 88% reducción    | $420K anualmente    |
| **Respuesta a Incidentes**   | 24-72 horas        | 2-8 horas            | 83% reducción    | $300K por incidente |
| **Preparación de Auditoría** | 320 horas anuales  | 40 horas anuales     | 88% reducción    | $280K anualmente    |

**Valor de Mitigación de Riesgo:**

```bash
# Valor anual de automatización PCI DSS
AHORROS_EN_PRUEBAS_DE_SEGURIDAD = 114000       # Escaneo de seguridad automatizado
AHORROS_EN_GESTIÓN_DE_VULNERABILIDADES = 156000     # Evaluación continua de vulnerabilidades
AHORROS_EN_INFORMES_DE_CUMPLIMIENTO = 216000   # Validación de cumplimiento automatizada
AHORROS_EN_REVISION_DE_CÓDIGO = 420000           # Revisión de código de seguridad automatizada
MEJORA_EN_RESPUESTA_A_INCIDENTES = 300000   # Detección/respuesta a incidentes más rápida
AHORROS_EN_PREPARACIÓN_DE_AUDITORÍA = 280000      # Recopilación de evidencia automatizada
VALOR_DE_EVITAR_MULTAS = 5000000          # Evitar multas por incumplimiento de PCI DSS

VALOR_ANUAL_TOTAL = (AHORROS_EN_PRUEBAS_DE_SEGURIDAD + AHORROS_EN_GESTIÓN_DE_VULNERABILIDADES +
                     AHORROS_EN_INFORMES_DE_CUMPLIMIENTO + AHORROS_EN_REVISION_DE_CÓDIGO +
                     MEJORA_EN_RESPUESTA_A_INCIDENTES + AHORROS_EN_PREPARACIÓN_DE_AUDITORÍA +
                     VALOR_DE_EVITAR_MULTAS)
# Valor Total: $6,486,000 anualmente

IMPLEMENTATION_COST = 500000 # Implementación de automatización PCI DSS ANNUAL_MAINTENANCE = 100000 # Mantenimiento continuo y cumplimiento

FIRST_YEAR_ROI = ((TOTAL_ANNUAL_VALUE - IMPLEMENTATION_COST - ANNUAL_MAINTENANCE) / (IMPLEMENTATION_COST + ANNUAL_MAINTENANCE)) * 100

ROI: 981% en el primer año

ONGOING_ROI = ((TOTAL_ANNUAL_VALUE - ANNUAL_MAINTENANCE) / ANNUAL_MAINTENANCE) * 100

ROI continuo: 6,386% anualmente


## Conclusión

La automatización DevSecOps de PCI DSS transforma la seguridad de los pagos de una carga de cumplimiento en una ventaja competitiva. Al implementar controles de seguridad automatizados, validación continua de cumplimiento y pruebas de seguridad integradas a lo largo del ciclo de vida de desarrollo, las organizaciones pueden procesar pagos de manera segura mientras mantienen la velocidad de desarrollo.

La clave para una automatización exitosa de PCI DSS radica en incorporar la seguridad en su proceso de desarrollo desde el principio, tratando el cumplimiento como código e implementando una validación continua en lugar de evaluaciones periódicas.

Recuerde que el cumplimiento de PCI DSS no se trata solo de evitar multas: se trata de proteger la confianza del cliente, habilitar el comercio seguro y construir sistemas de pago resilientes que puedan adaptarse a amenazas en evolución.

**Su viaje de automatización de PCI DSS comienza con la implementación de prácticas de desarrollo seguro y pruebas de seguridad automatizadas en su canalización CI/CD. Comience hoy y avance hacia una automatización integral de la seguridad de pagos.**