Automatización SOC2 Tipo II: Recopilación de Evidencia con Infraestructura como Código

Automatización SOC2 Tipo II: Recopilación de Evidencia con Infraestructura como Código

El Desafío de Automatización SOC2 Tipo II

Su organización se somete a auditorías SOC2 Tipo II anualmente, requiriendo meses de recopilación manual de evidencia, documentación de políticas y validación de controles en cientos de sistemas y procesos. Los auditores solicitan evidencia que abarque 12 meses de operaciones, obligando a su equipo a recopilar retroactivamente registros, capturas de pantalla y documentación que puede estar incompleta o ser inconsistente. Este enfoque manual crea ansiedad por la auditoría, consume recursos significativos y proporciona una garantía limitada sobre la postura de seguridad real.

La automatización SOC2 Tipo II transforma el cumplimiento de una lucha periódica en una validación continua, proporcionando recopilación de evidencia en tiempo real, aplicación automatizada de políticas y rastros de auditoría completos que reducen el tiempo de auditoría de meses a semanas mientras mejoran la postura de seguridad real.

Marco SOC2 Tipo II para DevSecOps

SOC2 Tipo II evalúa la efectividad de los controles a lo largo del tiempo, requiriendo evidencia de implementación consistente en todos los Criterios de Servicio de Confianza: Seguridad, Disponibilidad, Integridad del Procesamiento, Confidencialidad y Privacidad. Los entornos modernos nativos de la nube permiten la recolección automatizada de evidencia y la validación continua del cumplimiento.

Componentes Principales de Automatización SOC2 Tipo II

1. Recolección Continua de Evidencia

  • Agregación y retención automatizada de registros en todos los sistemas
  • Monitoreo y validación de cumplimiento de políticas en tiempo real
  • Gestión de configuraciones y detección de desviaciones
  • Pistas de auditoría de gestión de identidad y acceso

2. Implementación Automatizada de Controles

  • Infraestructura como Código con controles de seguridad integrados
  • Política como Código para la aplicación automatizada de gobernanza
  • Monitoreo continuo de seguridad y alertas
  • Flujos de trabajo de remediación y respuesta automatizados

3. Generación de Pistas de Auditoría

  • Registros de auditoría inmutables con verificación criptográfica
  • Informes de cumplimiento automatizados y paneles de control
  • Empaquetado de evidencia y automatización de soporte de auditoría
  • Análisis histórico de cumplimiento y tendencias

Infraestructura como Código para Cumplimiento SOC2

La Infraestructura como Código proporciona la base para la implementación de controles de seguridad consistentes, auditables y repetibles en todos los entornos.

Plantillas de Infraestructura Cumplientes con SOC2

1. Módulos de Terraform Enfocados en la Seguridad

# terraform/modules/soc2-compliant-vpc/main.tf
terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
}

# Control de Seguridad SOC2: Segmentación de Red (CC6.1)
resource "aws_vpc" "soc2_vpc" {
  cidr_block           = var.vpc_cidr
  enable_dns_hostnames = true
  enable_dns_support   = true

tags = merge(var.common_tags, { Name = ”${var.environment}-soc2-vpc” “SOC2:Control” = “CC6.1” “SOC2:Description” = “Segmentación de red y controles de acceso” “Compliance:Framework” = “SOC2-Type-II” “Audit:Required” = “true” }) }

Control de Seguridad SOC2: Registros de Flujo de VPC (CC7.2)

resource “aws_flow_log” “vpc_flow_log” { iam_role_arn = aws_iam_role.flow_log_role.arn log_destination = aws_cloudwatch_log_group.vpc_flow_log.arn traffic_type = “ALL” vpc_id = aws_vpc.soc2_vpc.id

tags = merge(var.common_tags, { Name = ”${var.environment}-vpc-flow-logs” “SOC2:Control” = “CC7.2” “SOC2:Description” = “Monitoreo y registro de red” “Audit:RetentionDays” = “2557” # 7 años para SOC2 }) }

Control de Seguridad SOC2: Registros de CloudWatch Encriptados (CC6.7)

resource “aws_cloudwatch_log_group” “vpc_flow_log” { name = “/aws/vpc/flowlogs/${var.environment}” retention_in_days = 2557 # Retención de 7 años para SOC2 kms_key_id = aws_kms_key.soc2_logging_key.arn

tags = merge(var.common_tags, { “SOC2:Control” = “CC6.7” “SOC2:Description” = “Registro de auditoría encriptado” “Audit:Critical” = “true” }) }

Control de Seguridad SOC2: Gestión de Claves de Encriptación (CC6.7)

resource “aws_kms_key” “soc2_logging_key” { description = “Clave de encriptación para cumplimiento SOC2” deletion_window_in_days = 30 enable_key_rotation = true

policy = jsonencode({ Version = “2012-10-17” Statement = [ { Sid = “Habilitar permisos de usuario IAM” Effect = “Allow” Principal = { AWS = “arn:aws:iam::${data.aws_caller_identity.current.account_id}:root” } Action = “kms:” Resource = "" }, { Sid = “Permitir registros de CloudWatch” Effect = “Allow” Principal = { Service = “logs.${data.aws_region.current.name}.amazonaws.com” } Action = [ “kms:Encrypt”, “kms:Decrypt”, “kms:ReEncrypt*”, “kms:GenerateDataKey*”, “kms:DescribeKey” ] Resource = ”*” Condition = { ArnEquals = { “kms:EncryptionContext:aws:logs:arn” = “arn:aws:logs:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:log-group:/aws/vpc/flowlogs/${var.environment}” } } } ] })

tags = merge(var.common_tags, { Name = ”${var.environment}-soc2-logging-key” “SOC2:Control” = “CC6.7” “SOC2:Description” = “Clave de cifrado para el registro de cumplimiento” “Audit:KeyRotation” = “habilitado” }) }

resource “aws_kms_alias” “soc2_logging_key_alias” { name = “alias/${var.environment}-soc2-logging” target_key_id = aws_kms_key.soc2_logging_key.key_id }

Control de Seguridad SOC2: Subredes Privadas (CC6.1)

resource “aws_subnet” “private_subnets” { count = length(var.private_subnet_cidrs)

vpc_id = aws_vpc.soc2_vpc.id cidr_block = var.private_subnet_cidrs[count.index] availability_zone = data.aws_availability_zones.available.names[count.index]

map_public_ip_on_launch = false

tags = merge(var.common_tags, { Name = ”${var.environment}-subred-privada-${count.index + 1}” Type = “Privada” “SOC2:Control” = “CC6.1” “SOC2:Description” = “Segmento de red privada aislado” “kubernetes.io/role/internal-elb” = “1” }) }

Control de Seguridad SOC2: Subredes Públicas con NACLs (CC6.1)

resource “aws_subnet” “public_subnets” { count = length(var.public_subnet_cidrs)

vpc_id = aws_vpc.soc2_vpc.id cidr_block = var.public_subnet_cidrs[count.index] availability_zone = data.aws_availability_zones.available.names[count.index]

map_public_ip_on_launch = true

tags = merge(var.common_tags, { Name = ”${var.environment}-subred-pública-${count.index + 1}” Type = “Pública” “SOC2:Control” = “CC6.1” “SOC2:Description” = “Acceso controlado a la red pública” “kubernetes.io/role/elb” = “1” }) }

SOC2 Control de Seguridad: ACLs de Red (CC6.1)

resource “aws_network_acl” “private_nacl” { vpc_id = aws_vpc.soc2_vpc.id subnet_ids = aws_subnet.private_subnets[*].id

Permitir HTTPS entrante desde subredes públicas

ingress { protocol = “tcp” rule_no = 100 action = “allow” cidr_block = var.vpc_cidr from_port = 443 to_port = 443 }

Permitir HTTP entrante desde subredes públicas (para verificaciones de salud)

ingress { protocol = “tcp” rule_no = 110 action = “allow” cidr_block = var.vpc_cidr from_port = 80 to_port = 80 }

Permitir puertos efímeros para respuestas

ingress { protocol = “tcp” rule_no = 120 action = “allow” cidr_block = “0.0.0.0/0” from_port = 1024 to_port = 65535 }

Permitir todo el tráfico saliente

egress { protocol = “-1” rule_no = 100 action = “allow” cidr_block = “0.0.0.0/0” from_port = 0 to_port = 0 }

tags = merge(var.common_tags, { Name = ”${var.environment}-private-nacl” “SOC2:Control” = “CC6.1” “SOC2:Description” = “Lista de control de acceso a la red para subredes privadas” }) }

Control de Seguridad SOC2: Rol IAM para Registros de Flujo (CC6.2)

resource “aws_iam_role” “flow_log_role” { name = ”${var.environment}-vpc-flow-log-role”

assume_role_policy = jsonencode({ Version = “2012-10-17” Statement = [ { Action = “sts:AssumeRole” Effect = “Allow” Principal = { Service = “vpc-flow-logs.amazonaws.com” } } ] })

tags = merge(var.common_tags, { “SOC2:Control” = “CC6.2” “SOC2:Description” = “Rol IAM para el servicio de registros de flujo de VPC” }) }

resource “aws_iam_role_policy” “flow_log_policy” { name = ”${var.environment}-vpc-flow-log-policy” role = aws_iam_role.flow_log_role.id

policy = jsonencode({ Version = “2012-10-17” Statement = [ { Action = [ “logs:CreateLogGroup”, “logs:CreateLogStream”, “logs:PutLogEvents”, “logs:DescribeLogGroups”, “logs:DescribeLogStreams” ] Effect = “Allow” Resource = aws_cloudwatch_log_group.vpc_flow_log.arn } ] }) }

Fuentes de datos

data “aws_caller_identity” “current” {} data “aws_region” “current” {} data “aws_availability_zones” “available” { state = “available” }

Variables

variable “environment” { description = “Nombre del entorno (por ejemplo, producción, staging)” type = string }

variable “vpc_cidr” { description = “Bloque CIDR para VPC” type = string default = “10.0.0.0/16” }

variable “private_subnet_cidrs” { description = “Bloques CIDR para subredes privadas” type = list(string) default = [“10.0.1.0/24”, “10.0.2.0/24”, “10.0.3.0/24”] }

variable “public_subnet_cidrs” { description = “Bloques CIDR para subredes públicas” type = list(string) default = [“10.0.101.0/24”, “10.0.102.0/24”, “10.0.103.0/24”] }

variable “common_tags” { description = “Etiquetas comunes para todos los recursos” type = map(string) default = { “Terraform” = “true” “Compliance:Framework” = “SOC2-Type-II” “Environment” = “producción” } }

Salidas

output “vpc_id” { description = “ID de la VPC” value = aws_vpc.soc2_vpc.id }

output “private_subnet_ids” { description = “IDs de las subredes privadas” value = aws_subnet.private_subnets[*].id }

output “public_subnet_ids” { description = “IDs de las subredes públicas” value = aws_subnet.public_subnets[*].id }

output “flow_log_group_name” { description = “Nombre del grupo de registros de flujo de VPC en CloudWatch” value = aws_cloudwatch_log_group.vpc_flow_log.name }

output “soc2_compliance_tags” { description = “Etiquetas de cumplimiento SOC2 aplicadas a recursos” value = { for key, value in var.common_tags : key => value if can(regex(”^(SOC2|Compliance|Audit):”, key)) } }


**2. Módulo de Seguridad de Aplicación SOC2**

```hcl
# terraform/modules/soc2-application/main.tf

# Control de Seguridad SOC2: Clúster ECS con Configuración de Seguridad (CC6.1)
resource "aws_ecs_cluster" "soc2_cluster" {
  name = "${var.application_name}-${var.environment}"

  setting {
    name  = "containerInsights"
    value = "enabled"
  }

  configuration {
    execute_command_configuration {
      kms_key_id = aws_kms_key.ecs_exec_key.arn
      logging    = "OVERRIDE"

      log_configuration {
        cloud_watch_encryption_enabled = true
        cloud_watch_log_group_name     = aws_cloudwatch_log_group.ecs_exec_logs.name
      }
    }
  }

tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-cluster"
    "SOC2:Control"         = "CC6.1"
    "SOC2:Description"     = "Plataforma de orquestación de contenedores segura"
    "Audit:ContainerInsights" = "habilitado"
  })
}

# Control de Seguridad SOC2: Balanceador de Carga de Aplicaciones con SSL (CC6.7)
resource "aws_lb" "application_lb" {
  name               = "${var.application_name}-${var.environment}-alb"
  internal           = false
  load_balancer_type = "application"
  security_groups    = [aws_security_group.alb_sg.id]
  subnets            = var.public_subnet_ids

  enable_deletion_protection = var.environment == "production" ? true : false
  enable_http2               = true

  access_logs {
    bucket  = aws_s3_bucket.alb_logs.bucket
    prefix  = "alb-logs"
    enabled = true
  }

tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-alb"
    "SOC2:Control"         = "CC6.7"
    "SOC2:Description"     = "Cifrado del balanceador de carga de aplicaciones"
    "Audit:AccessLogs"     = "habilitado"
  })
}

# Control de Seguridad SOC2: Bucket S3 para Logs de ALB (CC7.2)
resource "aws_s3_bucket" "alb_logs" {
  bucket        = "${var.application_name}-${var.environment}-alb-logs-${random_string.bucket_suffix.result}"
  force_destroy = var.environment != "production"

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-alb-logs"
    "SOC2:Control"         = "CC7.2"
    "SOC2:Description"     = "Logs de acceso para el balanceador de carga de aplicaciones"
    "Audit:RetentionYears" = "7"
  })
}

resource "aws_s3_bucket_versioning" "alb_logs" {
  bucket = aws_s3_bucket.alb_logs.id
  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_encryption" "alb_logs" {
  bucket = aws_s3_bucket.alb_logs.id

  server_side_encryption_configuration {
    rule {
      apply_server_side_encryption_by_default {
        kms_master_key_id = aws_kms_key.s3_key.arn
        sse_algorithm     = "aws:kms"
      }
    }
  }
}

resource "aws_s3_bucket_lifecycle_configuration" "alb_logs" {
  bucket = aws_s3_bucket.alb_logs.id

  rule {
    id     = "soc2_compliance_retention"
    status = "Enabled"

    transition {
      days          = 30
      storage_class = "STANDARD_IA"
    }

    transition {
      days          = 90
      storage_class = "GLACIER"
    }

    transition {
      days          = 365
      storage_class = "DEEP_ARCHIVE"
    }

    expiration {
      days = 2557  # 7 años para cumplimiento SOC2
    }
  }
}

# Control de Seguridad SOC2: Definición de Tarea ECS con Contexto de Seguridad (CC6.1)
resource "aws_ecs_task_definition" "app_task" {
  family                   = "${var.application_name}-${var.environment}"
  network_mode             = "awsvpc"
  requires_compatibilities = ["FARGATE"]
  cpu                      = var.task_cpu
  memory                   = var.task_memory
  execution_role_arn       = aws_iam_role.ecs_execution_role.arn
  task_role_arn           = aws_iam_role.ecs_task_role.arn

  container_definitions = jsonencode([
    {
      name      = var.application_name
      image     = var.container_image
      essential = true

      portMappings = [
        {
          containerPort = var.container_port
          protocol      = "tcp"
        }
      ]

# Control de Seguridad SOC2: Registro de Aplicaciones (CC7.2)
logConfiguration = {
  logDriver = "awslogs"
  options = {
    awslogs-group         = aws_cloudwatch_log_group.app_logs.name
    awslogs-region        = data.aws_region.current.name
    awslogs-stream-prefix = "ecs"
  }
}

# Control de Seguridad SOC2: Seguridad de Variables de Entorno
environment = [
  {
    name  = "ENVIRONMENT"
    value = var.environment
  },
  {
    name  = "LOG_LEVEL"
    value = var.environment == "production" ? "INFO" : "DEBUG"
  }
]

# Control de Seguridad SOC2: Gestión de Secretos (CC6.7)
secrets = [
  {
    name      = "DATABASE_PASSWORD"
    valueFrom = aws_secretsmanager_secret.app_secrets.arn
  }
]

# Control de Seguridad SOC2: Seguridad de Contenedores
readonlyRootFilesystem = true
user                   = "1001"  # Usuario no root

      healthCheck = {
        command     = ["CMD-SHELL", "curl -f http://localhost:${var.container_port}/health || exit 1"]
        interval    = 30
        timeout     = 5
        retries     = 3
        startPeriod = 60
      }
    }
  ])

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-task"
    "SOC2:Control"         = "CC6.1"
    "SOC2:Description"     = "Definición de tarea de contenedor seguro"
    "Audit:ReadOnlyRoot"   = "true"
    "Audit:NonRootUser"    = "true"
  })
}

# Control de Seguridad SOC2: Grupos de Logs de CloudWatch con Encriptación (CC6.7)
resource "aws_cloudwatch_log_group" "app_logs" {
  name              = "/ecs/${var.application_name}-${var.environment}"
  retention_in_days = 2557  # 7 años para SOC2
  kms_key_id        = aws_kms_key.cloudwatch_key.arn

tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-logs"
    "SOC2:Control"         = "CC6.7"
    "SOC2:Description"     = "Registros de aplicación cifrados"
    "Audit:RetentionDays"  = "2557"
  })
}

resource "aws_cloudwatch_log_group" "ecs_exec_logs" {
  name              = "/ecs/exec/${var.application_name}-${var.environment}"
  retention_in_days = 90  # 90 días para registros de acceso ejecutivo
  kms_key_id        = aws_kms_key.ecs_exec_key.arn

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-exec-logs"
    "SOC2:Control"         = "CC6.2"
    "SOC2:Description"     = "Registros de acceso ejecutivo de contenedores"
    "Audit:AccessType"     = "administrativo"
  })
}

# Control de Seguridad SOC2: Claves KMS para Encriptación (CC6.7)
resource "aws_kms_key" "cloudwatch_key" {
  description             = "Clave KMS para encriptación de registros de CloudWatch"
  deletion_window_in_days = 30
  enable_key_rotation     = true

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-cloudwatch-key"
    "SOC2:Control"         = "CC6.7"
    "SOC2:Description"     = "Clave de encriptación para registros de CloudWatch"
    "Audit:KeyRotation"    = "enabled"
  })
}

resource "aws_kms_key" "ecs_exec_key" {
  description             = "Clave KMS para encriptación de ECS Exec"
  deletion_window_in_days = 30
  enable_key_rotation     = true

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-ecs-exec-key"
    "SOC2:Control"         = "CC6.7"
    "SOC2:Description"     = "Clave de encriptación para sesiones de ECS Exec"
  })
}

resource "aws_kms_key" "s3_key" {
  description             = "Clave KMS para cifrado S3"
  deletion_window_in_days = 30
  enable_key_rotation     = true

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-s3-key"
    "SOC2:Control"         = "CC6.7"
    "SOC2:Description"     = "Clave de cifrado para los buckets S3"
  })
}

# Control de Seguridad SOC2: Secrets Manager (CC6.7)
resource "aws_secretsmanager_secret" "app_secrets" {
  name                    = "${var.application_name}-${var.environment}-secrets"
  description             = "Secretos de la aplicación para ${var.application_name}"
  kms_key_id             = aws_kms_key.secrets_key.arn
  recovery_window_in_days = var.environment == "production" ? 30 : 0

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-secrets"
    "SOC2:Control"         = "CC6.7"
    "SOC2:Description"     = "Gestión de secretos cifrados"
    "Audit:SecretRotation" = "habilitado"
  })
}

resource "aws_kms_key" "secrets_key" {
  description             = "Clave KMS para Secrets Manager"
  deletion_window_in_days = 30
  enable_key_rotation     = true

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-secrets-key"
    "SOC2:Control"         = "CC6.7"
    "SOC2:Description"     = "Clave de cifrado para secretos"
  })
}

# Cadena aleatoria para nombrado único de bucket
resource "random_string" "bucket_suffix" {
  length  = 8
  special = false
  upper   = false
}

# Fuentes de datos
data "aws_region" "current" {}
data "aws_caller_identity" "current" {}

Recopilación Automatizada de Evidencia SOC2

Sistema de Monitoreo de Cumplimiento Continuo

1. Automatización de la Recolección de Evidencias SOC2

#!/usr/bin/env python3
# soc2-automation/evidence_collector.py

import boto3
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import pandas as pd
from pathlib import Path
import hashlib

class SOC2EvidenceCollector:
    def __init__(self, aws_profile: str = None):
        self.session = boto3.Session(profile_name=aws_profile)
        self.logger = logging.getLogger(__name__)

        # Inicializar clientes de AWS
        self.cloudtrail = self.session.client('cloudtrail')
        self.cloudwatch = self.session.client('cloudwatch')
        self.logs = self.session.client('logs')
        self.config = self.session.client('config')
        self.iam = self.session.client('iam')
        self.s3 = self.session.client('s3')

Mapeo de Criterios de Servicio de Confianza SOC2

self.trust_criteria = { ‘CC1’: ‘Ambiente de Control’, ‘CC2’: ‘Comunicación e Información’, ‘CC3’: ‘Evaluación de Riesgos’, ‘CC4’: ‘Actividades de Monitoreo’, ‘CC5’: ‘Actividades de Control’, ‘CC6’: ‘Controles de Acceso Lógico y Físico’, ‘CC7’: ‘Operaciones del Sistema’, ‘CC8’: ‘Gestión de Cambios’, ‘CC9’: ‘Mitigación de Riesgos’ }

def collect_comprehensive_evidence(self, start_date: datetime, end_date: datetime) -> Dict: """Recopilar evidencia integral SOC2 Tipo II"""

evidence_package = { ‘collection_metadata’: { ‘collection_date’: datetime.now().isoformat(), ‘evidence_period_start’: start_date.isoformat(), ‘evidence_period_end’: end_date.isoformat(), ‘collector_version’: ‘2.0’, ‘aws_account_id’: self.session.client(‘sts’).get_caller_identity()[‘Account’] }, ‘control_evidence’: {} }

Recopilar evidencia para cada Criterio de Servicio de Confianza

for criteria_id, description in self.trust_criteria.items(): self.logger.info(f”Recopilando evidencia para {criteria_id}: {description}”) evidence_package[‘control_evidence’][criteria_id] =
self._collect_criteria_evidence(criteria_id, start_date, end_date)

Generar hashes de integridad de la evidencia

evidence_package[‘integrity’] = self._generate_evidence_integrity(evidence_package)

return evidence_package

def _collect_criteria_evidence(self, criteria_id: str,
                                 start_date: datetime, end_date: datetime) -> Dict:
    """Recopilar evidencia para criterios específicos de servicio de confianza"""

    evidence = {
        'criteria_id': criteria_id,
        'criteria_description': self.trust_criteria[criteria_id],
        'evidence_items': [],
        'metrics': {},
        'compliance_status': 'PENDIENTE'
    }
    if criteria_id == 'CC1':  # Ambiente de Control
        evidence['evidence_items'].extend(self._collect_cc1_evidence(start_date, end_date))
    elif criteria_id == 'CC2':  # Comunicación e Información
        evidence['evidence_items'].extend(self._collect_cc2_evidence(start_date, end_date))
    elif criteria_id == 'CC6':  # Controles de Acceso Lógico y Físico
        evidence['evidence_items'].extend(self._collect_cc6_evidence(start_date, end_date))
    elif criteria_id == 'CC7':  # Operaciones del Sistema
        evidence['evidence_items'].extend(self._collect_cc7_evidence(start_date, end_date))
    elif criteria_id == 'CC8':  # Gestión de Cambios
        evidence['evidence_items'].extend(self._collect_cc8_evidence(start_date, end_date))

Calcular métricas de cumplimiento

evidence[‘metrics’] = self._calculate_compliance_metrics(evidence[‘evidence_items’]) evidence[‘compliance_status’] = self._determine_compliance_status(evidence[‘metrics’])

return evidence

def _collect_cc1_evidence(self, start_date: datetime, end_date: datetime) -> List[Dict]: """Recopilar evidencia de CC1 (Ambiente de Control)"""

evidence_items = []

# CC1.1: La gestión establece estructuras, líneas de reporte y autoridades
org_policies = self._get_iam_policies_evidence()
evidence_items.append({
    'control_id': 'CC1.1',
    'control_description': 'Estructura organizacional y autoridad',
    'evidence_type': 'iam_policies',
    'evidence_data': org_policies,
    'collection_timestamp': datetime.now().isoformat(),
    'automated': True
})

CC1.2: La junta directiva y la gerencia establecen responsabilidades de supervisión

governance_evidence = self._get_governance_evidence(start_date, end_date) evidence_items.append({ ‘control_id’: ‘CC1.2’, ‘control_description’: ‘Gobernanza y supervisión’, ‘evidence_type’: ‘actividades_de_gobernanza’, ‘evidence_data’: governance_evidence, ‘collection_timestamp’: datetime.now().isoformat(), ‘automated’: True })

return evidence_items

def _collect_cc6_evidence(self, start_date: datetime, end_date: datetime) -> List[Dict]: """Recopilar evidencia de CC6 (Controles de Acceso Lógico y Físico)"""

evidence_items = []

CC6.1: Los controles de acceso restringen el acceso no autorizado

access_controls = self._get_access_control_evidence(start_date, end_date) evidence_items.append({ ‘control_id’: ‘CC6.1’, ‘control_description’: ‘Controles de acceso lógico’, ‘evidence_type’: ‘access_controls’, ‘evidence_data’: access_controls, ‘collection_timestamp’: datetime.now().isoformat(), ‘automated’: True })

CC6.2: El acceso privilegiado está restringido y monitoreado

privileged_access = self._get_privileged_access_evidence(start_date, end_date) evidence_items.append({ ‘control_id’: ‘CC6.2’, ‘control_description’: ‘Monitoreo de acceso privilegiado’, ‘evidence_type’: ‘privileged_access_logs’, ‘evidence_data’: privileged_access, ‘collection_timestamp’: datetime.now().isoformat(), ‘automated’: True })

CC6.7: Controles de transmisión y eliminación de datos

encryption_evidence = self._get_encryption_evidence() evidence_items.append({ ‘control_id’: ‘CC6.7’, ‘control_description’: ‘Transmisión de datos y encriptación’, ‘evidence_type’: ‘configuración_de_encriptación’, ‘evidence_data’: encryption_evidence, ‘collection_timestamp’: datetime.now().isoformat(), ‘automated’: True })

return evidence_items

def _collect_cc7_evidence(self, start_date: datetime, end_date: datetime) -> List[Dict]: """Recopilar evidencia de CC7 (Operaciones del sistema)"""

evidence_items = []

CC7.1: Monitoreo de capacidad y rendimiento del sistema

performance_monitoring = self._get_performance_monitoring_evidence(start_date, end_date) evidence_items.append({ ‘control_id’: ‘CC7.1’, ‘control_description’: ‘Monitoreo de rendimiento y capacidad’, ‘evidence_type’: ‘performance_metrics’, ‘evidence_data’: performance_monitoring, ‘collection_timestamp’: datetime.now().isoformat(), ‘automated’: True })

CC7.2: Monitoreo del sistema para eventos de seguridad

security_monitoring = self._get_security_monitoring_evidence(start_date, end_date) evidence_items.append({ ‘control_id’: ‘CC7.2’, ‘control_description’: ‘Monitoreo de eventos de seguridad’, ‘evidence_type’: ‘security_logs’, ‘evidence_data’: security_monitoring, ‘collection_timestamp’: datetime.now().isoformat(), ‘automated’: True })

    return evidence_items

def _collect_cc8_evidence(self, start_date: datetime, end_date: datetime) -> List[Dict]:
    """Recopilar evidencia de CC8 (Gestión de Cambios)"""

    evidence_items = []

    # CC8.1: Proceso de gestión de cambios
    change_management = self._get_change_management_evidence(start_date, end_date)
    evidence_items.append({
        'control_id': 'CC8.1',
        'control_description': 'Proceso de gestión de cambios',
        'evidence_type': 'cambios_infraestructura',
        'evidence_data': change_management,
        'collection_timestamp': datetime.now().isoformat(),
        'automated': True
    })

    return evidence_items

def _get_access_control_evidence(self, start_date: datetime, end_date: datetime) -> Dict:
    """Recopilar evidencia de control de acceso desde CloudTrail"""

Consultar CloudTrail para eventos de autenticación y autorización

    events = self.cloudtrail.lookup_events(
        LookupAttributes=[
            {
                'AttributeKey': 'EventName',
                'AttributeValue': 'AssumeRole'
            }
        ],
        StartTime=start_date,
        EndTime=end_date,
        MaxItems=1000
    )

    access_events = []
    for event in events.get('Events', []):
        access_events.append({
            'event_time': event['EventTime'].isoformat(),
            'event_name': event['EventName'],
            'user_identity': event.get('UserIdentity', {}),
            'source_ip': event.get('SourceIPAddress'),
            'user_agent': event.get('UserAgent'),
            'resources': event.get('Resources', []),
            'cloud_trail_event': event.get('CloudTrailEvent')
        })

Obtener la configuración actual de IAM

iam_users = self.iam.list_users() iam_roles = self.iam.list_roles()

{ ‘access_events_count’: len(eventos_de_acceso), ‘access_events_sample’: eventos_de_acceso[:50], # Primeros 50 para auditoría ‘iam_users_count’: len(usuarios_iam[‘Usuarios’]), ‘iam_roles_count’: len(roles_iam[‘Roles’]), ‘user_summary’: [ { ‘username’: usuario[‘NombreUsuario’], ‘create_date’: usuario[‘FechaCreacion’].isoformat(), ‘password_last_used’: usuario.get(‘UltimaContraseñaUsada’, ‘Nunca’).isoformat() if usuario.get(‘UltimaContraseñaUsada’) != ‘Nunca’ else ‘Nunca’ } for usuario in usuarios_iam[‘Usuarios’] ], ‘role_summary’: [ { ‘role_name’: rol[‘NombreRol’], ‘create_date’: rol[‘FechaCreacion’].isoformat(), ‘assume_role_policy’: rol[‘DocumentoPoliticaAsumirRol’] } for rol in roles_iam[‘Roles’] ] }

def _get_privileged_access_evidence(self, start_date: datetime, end_date: datetime) -> Dict:
    """Recopilar evidencia de acceso privilegiado"""

    # Consulta para operaciones privilegiadas
    privileged_events = self.cloudtrail.lookup_events(
        LookupAttributes=[
            {
                'AttributeKey': 'EventName',
                'AttributeValue': 'CreateUser'
            }
        ],
        StartTime=start_date,
        EndTime=end_date
    )

    # También verificar el acceso a la consola administrativa
    console_events = self.cloudtrail.lookup_events(
        LookupAttributes=[
            {
                'AttributeKey': 'EventName',
                'AttributeValue': 'ConsoleLogin'
            }
        ],
        StartTime=start_date,
        EndTime=end_date
    )
    return {
        'operaciones_privilegiadas': len(privileged_events.get('Events', [])),
        'inicios_de_sesión_en_consola': len(console_events.get('Events', [])),
        'muestra_de_eventos_privilegiados': [
            {
                'hora_del_evento': event['EventTime'].isoformat(),
                'nombre_del_evento': event['EventName'],
                'identidad_del_usuario': event.get('UserIdentity', {}),
                'ip_de_origen': event.get('SourceIPAddress')
            }
            for event in privileged_events.get('Events', [])[:20]
        ],
        'muestra_de_inicio_de_sesión_en_consola': [
            {
                'hora_del_evento': event['EventTime'].isoformat(),
                'identidad_del_usuario': event.get('UserIdentity', {}),
                'ip_de_origen': event.get('SourceIPAddress'),
                'mfa_usado': 'Sí' if event.get('CloudTrailEvent', '').find('"mfaUsed":"true"') > -1 else 'No'
            }
            for event in console_events.get('Events', [])[:20]
        ]
    }
def _get_encryption_evidence(self) -> Dict:
    """Recopilar evidencia de configuración de cifrado"""

    # Obtener claves KMS
    kms_client = self.session.client('kms')
    keys = kms_client.list_keys()

    encryption_evidence = {
        'kms_keys_count': len(keys['Keys']),
        'kms_keys_details': []
    }

    for key in keys['Keys'][:20]:  # Muestra los primeros 20
        try:
            key_details = kms_client.describe_key(KeyId=key['KeyId'])
            key_rotation = kms_client.get_key_rotation_status(KeyId=key['KeyId'])
            encryption_evidence['kms_keys_details'].append({
                'key_id': key['KeyId'],
                'key_arn': key['Arn'],
                'description': key_details['KeyMetadata'].get('Description', ''),
                'key_usage': key_details['KeyMetadata'].get('KeyUsage', ''),
                'key_state': key_details['KeyMetadata'].get('KeyState', ''),
                'creation_date': key_details['KeyMetadata'].get('CreationDate', '').isoformat()
                               if key_details['KeyMetadata'].get('CreationDate') else '',
                'rotation_enabled': key_rotation.get('KeyRotationEnabled', False)
            })
        except Exception as e:
            self.logger.warning(f"No se pudieron obtener detalles para la clave {key['KeyId']}: {str(e)}")

    # Obtener cifrado de los buckets de S3
    s3_buckets = self.s3.list_buckets()
    bucket_encryption = []

    for bucket in s3_buckets['Buckets'][:10]:  # Muestra los primeros 10
        try:
            encryption = self.s3.get_bucket_encryption(Bucket=bucket['Name'])
            bucket_encryption.append({
                'bucket_name': bucket['Name'],
                'encryption_algorithm': encryption['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'],
                'kms_key_id': encryption['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault'].get('KMSMasterKeyID', 'Default')
            })
        except:
            bucket_encryption.append({
                'bucket_name': bucket['Name'],
                'encryption_algorithm': 'None',
                'kms_key_id': 'None'
            })

    encryption_evidence['s3_bucket_encryption'] = bucket_encryption

    return encryption_evidence

def _get_security_monitoring_evidence(self, start_date: datetime, end_date: datetime) -> Dict:
    """Recopilar evidencia de monitoreo de seguridad"""

    # Obtener alarmas de CloudWatch
    alarms = self.cloudwatch.describe_alarms()

    # Obtener grupos de registros para monitoreo de seguridad
    log_groups = self.logs.describe_log_groups()

    # Obtener eventos de CloudTrail relacionados con la seguridad
    security_events = self.cloudtrail.lookup_events(
        LookupAttributes=[
            {
                'AttributeKey': 'EventName',
                'AttributeValue': 'CreateSecurityGroup'
            }
        ],
        StartTime=start_date,
        EndTime=end_date
    )

    return {
        'conteo_alarmas_cloudwatch': len(alarmas['MetricAlarms']),
        'grupos_de_logs_de_seguridad': [
            lg['logGroupName'] for lg in grupos_de_logs['logGroups']
            if any(palabra_clave in lg['logGroupName'].lower() for palabra_clave in ['security', 'auth', 'access', 'audit'])
        ],
        'conteo_eventos_de_seguridad': len(eventos_de_seguridad.get('Events', [])),
        'metricas_de_monitoreo': {
            'alarmas_en_estado_de_alarma': len([alarma for alarma in alarmas['MetricAlarms'] if alarma['StateValue'] == 'ALARM']),
            'alarmas_en_estado_ok': len([alarma for alarma in alarmas['MetricAlarms'] if alarma['StateValue'] == 'OK']),
            'grupos_de_logs_con_retencion': len([lg for lg in grupos_de_logs['logGroups'] if lg.get('retentionInDays')])
        }
    }

def _obtener_evidencia_de_gestion_de_cambios(self, fecha_inicio: datetime, fecha_fin: datetime) -> Dict:
    """Recopilar evidencia de gestión de cambios"""

Obtener eventos de pila de CloudFormation

cf_client = self.session.client(‘cloudformation’) stacks = cf_client.list_stacks(StackStatusFilter=[‘CREATE_COMPLETE’, ‘UPDATE_COMPLETE’])

change_events = [] for stack in stacks[‘StackSummaries’][:20]: # Muestra los primeros 20 try: events = cf_client.describe_stack_events(StackName=stack[‘StackName’]) stack_changes = [ { ‘stack_name’: stack[‘StackName’], ‘event_id’: event[‘EventId’], ‘timestamp’: event[‘Timestamp’].isoformat(), ‘resource_type’: event.get(‘ResourceType’), ‘resource_status’: event.get(‘ResourceStatus’), ‘resource_status_reason’: event.get(‘ResourceStatusReason’) } for event in events[‘StackEvents’] if start_date <= event[‘Timestamp’].replace(tzinfo=None) <= end_date ] change_events.extend(stack_changes) except Exception as e: self.logger.warning(f”No se pudieron obtener eventos para la pila {stack[‘StackName’]}: {str(e)}“)

Obtener cambios de estado de instancia EC2

    ec2_events = self.cloudtrail.lookup_events(
        LookupAttributes=[
            {
                'AttributeKey': 'EventName',
                'AttributeValue': 'RunInstances'
            }
        ],
        StartTime=start_date,
        EndTime=end_date
    )

    return {
        'cambios_cloudformation': len(change_events),
        'lanzamientos_instancia_ec2': len(ec2_events.get('Events', [])),
        'muestra_eventos_cambio': change_events[:50],
        'resumen_cambio_infraestructura': {
            'cambios_totales': len(change_events) + len(ec2_events.get('Events', [])),
            'pilas_cloudformation': len(stacks['StackSummaries']),
            'frecuencia_cambio_por_día': len(change_events) / max((end_date - start_date).days, 1)
        }
    }

def _calculate_compliance_metrics(self, evidence_items: List[Dict]) -> Dict:
    """Calcular métricas de cumplimiento a partir de evidencia"""

    metrics = {
        'total_controls': len(evidence_items),
        'automated_controls': len([item for item in evidence_items if item.get('automated', False)]),
        'manual_controls': len([item for item in evidence_items if not item.get('automated', True)]),
        'evidence_completeness': 0,
        'control_effectiveness': 0
    }

    # Calcular la integridad de la evidencia
    complete_evidence = len([item for item in evidence_items if item.get('evidence_data')])
    metrics['evidence_completeness'] = (complete_evidence / len(evidence_items) * 100) if evidence_items else 0

Calcular la efectividad del control (puntuación simplificada)

effectiveness_scores = [] for item in evidence_items: evidence_data = item.get(‘evidence_data’, {}) if isinstance(evidence_data, dict): # Puntuación simple basada en la riqueza de la evidencia score = min(100, len(str(evidence_data)) / 100) # Puntuación básica effectiveness_scores.append(score)

metrics[‘control_effectiveness’] = sum(effectiveness_scores) / len(effectiveness_scores) if effectiveness_scores else 0

return metrics

def _determine_compliance_status(self, metrics: Dict) -> str: """Determinar el estado general de cumplimiento"""

completeness = metrics.get('evidence_completeness', 0)
effectiveness = metrics.get('control_effectiveness', 0)

    if completeness >= 95 and effectiveness >= 80:
        return 'CUMPLE'
    elif completeness >= 85 and effectiveness >= 70:
        return 'MAYORMENTE_CUMPLE'
    elif completeness >= 70:
        return 'PARCIALMENTE_CUMPLE'
    else:
        return 'NO_CUMPLE'

def _generar_integridad_evidencia(self, paquete_evidencia: Dict) -> Dict:
    """Generar hashes de integridad para el paquete de evidencia"""

    # Crear hash de los datos de evidencia
    cadena_evidencia = json.dumps(paquete_evidencia['control_evidence'], sort_keys=True)
    hash_evidencia = hashlib.sha256(cadena_evidencia.encode()).hexdigest()

    return {
        'hash_evidencia': hash_evidencia,
        'marca_tiempo_coleccion': datetime.now().isoformat(),
        'algoritmo_integridad': 'SHA256',
        'tamaño_evidencia_bytes': len(cadena_evidencia)
    }
def generate_audit_report(self, evidence_package: Dict) -> str:
    """Generar informe de auditoría legible para humanos"""

    report = f"""
# Informe de Recolección de Evidencia SOC2 Tipo II
Generado: {evidence_package['collection_metadata']['collection_date']}

## Resumen de la Recolección
- **Período de Evidencia**: {evidence_package['collection_metadata']['evidence_period_start']} a {evidence_package['collection_metadata']['evidence_period_end']}
- **Cuenta de AWS**: {evidence_package['collection_metadata']['aws_account_id']}
- **Método de Recolección**: Automatizado
- **Hash de Integridad de Evidencia**: {evidence_package['integrity']['evidence_hash']}

## Resumen de Evidencia de Control

"""
    for criteria_id, evidence in evidence_package['control_evidence'].items():
        report += f"""

{criteria_id}: {evidence[‘criteria_description’]}

  • Estado de Cumplimiento: {evidence[‘compliance_status’]}
  • Elementos de Evidencia: {len(evidence[‘evidence_items’])}
  • Integridad de la Evidencia: {evidence[‘metrics’][‘evidence_completeness’]:.1f}%
  • Efectividad del Control: {evidence[‘metrics’][‘control_effectiveness’]:.1f}%

"""

        for item in evidence['evidence_items']:
            report += f"""

{item[‘control_id’]}: {item[‘control_description’]}

  • Tipo de Evidencia: {item[‘evidence_type’]}
  • Método de Recolección: {‘Automatizado’ if item[‘automated’] else ‘Manual’}
  • Marca de Tiempo: {item[‘collection_timestamp’]}

"""

    return report

if name == “main”: # Ejemplo de uso collector = SOC2EvidenceCollector()

# Recopilar evidencia de los últimos 90 días
end_date = datetime.now()
start_date = end_date - timedelta(days=90)

evidencia = colector.recolectar_evidencia_comprensiva(fecha_inicio, fecha_fin)

# Guardar paquete de evidencia
con open('paquete-evidencia-soc2.json', 'w') como f:
    json.dump(evidencia, f, indent=2, default=str)

# Generar informe de auditoría
informe = colector.generar_informe_auditoría(evidencia)
con open('informe-auditoría-soc2.md', 'w') como f:
    f.write(informe)

print("¡Recolección de evidencia SOC2 completada!")
print(f"Paquete de evidencia: paquete-evidencia-soc2.json")
print(f"Informe de auditoría: informe-auditoría-soc2.md")

**2. Tablero de Cumplimiento Automatizado**

```python
#!/usr/bin/env python3
# soc2-automatización/tablero_cumplimiento.py

import boto3
import json
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime, timedelta
from typing import Dict, List

class TableroCumplimientoSOC2:
    def __init__(self):
        self.colector_evidencia = ColectorEvidenciaSOC2()

    def create_dashboard(self):
        """Crear panel de control de Streamlit para el monitoreo de cumplimiento SOC2"""

        st.set_page_config(
            page_title="Panel de Control de Cumplimiento SOC2 Tipo II",
            page_icon="🔒",
            layout="wide"
        )

        st.title("🔒 Panel de Control de Cumplimiento SOC2 Tipo II")
        st.markdown("Monitoreo de cumplimiento en tiempo real y recopilación de evidencia")

        # Controles de la barra lateral
        st.sidebar.header("Controles del Panel de Control")

        # Selector de rango de fechas
        col1, col2 = st.sidebar.columns(2)
        with col1:
            start_date = st.date_input("Fecha de Inicio", datetime.now() - timedelta(days=90))
        with col2:
            end_date = st.date_input("Fecha de Fin", datetime.now())

# Botón de actualizar datos
if st.sidebar.button("Actualizar Evidencia"):
    with st.spinner("Recopilando evidencia..."):
        evidence = self.evidence_collector.collect_comprehensive_evidence(
            datetime.combine(start_date, datetime.min.time()),
            datetime.combine(end_date, datetime.min.time())
        )
        st.session_state['evidence'] = evidence

# Cargar datos de evidencia
if 'evidence' not in st.session_state:
    with st.spinner("Cargando evidencia inicial..."):
        evidence = self.evidence_collector.collect_comprehensive_evidence(
            datetime.combine(start_date, datetime.min.time()),
            datetime.combine(end_date, datetime.min.time())
        )
        st.session_state['evidence'] = evidence

evidence = st.session_state['evidence']

# Panel principal
self._create_overview_section(evidence)
self._create_compliance_metrics(evidence)
self._create_control_details(evidence)
self._create_evidence_timeline(evidence)

def _create_overview_section(self, evidence: Dict):
    """Crear sección de resumen con métricas clave"""

    st.header("📊 Resumen de Cumplimiento")

    # Calcular métricas generales
    total_controls = sum(len(ctrl['evidence_items']) for ctrl in evidence['control_evidence'].values())
    compliant_controls = sum(
        len([item for item in ctrl['evidence_items'] if item.get('automated', False)])
        for ctrl in evidence['control_evidence'].values()
    )

    overall_completeness = sum(
        ctrl['metrics']['evidence_completeness']
        for ctrl in evidence['control_evidence'].values()
    ) / len(evidence['control_evidence'])

        efectividad_general = sum(
            ctrl['metrics']['control_effectiveness']
            for ctrl in evidence['control_evidence'].values()
        ) / len(evidence['control_evidence'])

        # Mostrar métricas clave
        col1, col2, col3, col4 = st.columns(4)

        with col1:
            st.metric(
                label="Cumplimiento General",
                value=f"{overall_completeness:.1f}%",
                delta=f"{overall_completeness - 85:.1f}%" if overall_completeness >= 85 else f"{overall_completeness - 85:.1f}%"
            )

        with col2:
            st.metric(
                label="Efectividad del Control",
                value=f"{overall_effectiveness:.1f}%",
                delta=f"{overall_effectiveness - 80:.1f}%" if overall_effectiveness >= 80 else f"{overall_effectiveness - 80:.1f}%"
            )

        with col3:
            st.metric(
                label="Controles Totales",
                value=total_controls,
                delta=f"🤖 {compliant_controls} automatizados"
            )

        with col4:
            st.metric(
                label="Periodo de Evidencia",
                value=f"{(datetime.fromisoformat(evidence['collection_metadata']['evidence_period_end']) - datetime.fromisoformat(evidence['collection_metadata']['evidence_period_start'])).days} días",
                delta="Colección continua"
            )

    def _create_compliance_metrics(self, evidence: Dict):
        """Crear visualización de métricas de cumplimiento"""

        st.header("📈 Cumplimiento de Criterios de Servicio de Confianza")

# Preparar datos para visualización
criteria_data = []
for criteria_id, ctrl_evidence in evidence['control_evidence'].items():
    criteria_data.append({
        'Criterio': criteria_id,
        'Descripción': ctrl_evidence['criteria_description'],
        'Integridad': ctrl_evidence['metrics']['evidence_completeness'],
        'Efectividad': ctrl_evidence['metrics']['control_effectiveness'],
        'Estado': ctrl_evidence['compliance_status'],
        'Controles': len(ctrl_evidence['evidence_items'])
    })

df = pd.DataFrame(criteria_data)

# Crear mapa de calor de cumplimiento
fig_heatmap = px.imshow(
    df[['Completeness', 'Effectiveness']].T,
    labels=dict(x="Criterios de Servicio de Confianza", y="Métricas", color="Puntuación"),
    x=df['Criteria'],
    y=['Integridad de Evidencia', 'Efectividad de Control'],
    color_continuous_scale='RdYlGn',
    aspect="auto"
)
fig_heatmap.update_layout(title="Mapa de Calor de Cumplimiento por Criterios de Servicio de Confianza")
st.plotly_chart(fig_heatmap, use_container_width=True)

# Distribución del estado de cumplimiento
col1, col2 = st.columns(2)

with col1:
    status_counts = df['Status'].value_counts()
    fig_pie = px.pie(
        values=status_counts.values,
        names=status_counts.index,
        title="Distribución del Estado de Cumplimiento"
    )
    st.plotly_chart(fig_pie, use_container_width=True)

        con col2:
            fig_bar = px.bar(
                df,
                x='Criterios',
                y=['Integridad', 'Efectividad'],
                title="Puntuaciones de Cumplimiento por Criterios",
                barmode='group'
            )
            st.plotly_chart(fig_bar, use_container_width=True)

    def _crear_detalles_control(self, evidencia: Dict):
        """Crear información detallada del control"""

        st.header("🔍 Detalles del Control")

        # Permitir a los usuarios seleccionar criterios
        criterios_seleccionados = st.selectbox(
            "Seleccione Criterios de Servicio de Confianza",
            options=list(evidencia['control_evidence'].keys()),
            format_func=lambda x: f"{x}: {evidencia['control_evidence'][x]['criteria_description']}"
        )

        if criterios_seleccionados:
            evidencia_ctrl = evidencia['control_evidence'][criterios_seleccionados]

            # Mostrar información de criterios
            col1, col2 = st.columns(2)

            with col1:
                st.subheader(f"{selected_criteria}: {ctrl_evidence['criteria_description']}")
                st.write(f"**Estado de Cumplimiento:** {ctrl_evidence['compliance_status']}")
                st.write(f"**Elementos de Evidencia:** {len(ctrl_evidence['evidence_items'])}")
                st.write(f"**Integridad de la Evidencia:** {ctrl_evidence['metrics']['evidence_completeness']:.1f}%")
                st.write(f"**Efectividad del Control:** {ctrl_evidence['metrics']['control_effectiveness']:.1f}%")

            con col2:
                # Medidor de efectividad del control
                fig_gauge = go.Figure(go.Indicator(
                    mode = "gauge+number",
                    value = ctrl_evidence['metrics']['control_effectiveness'],
                    domain = {'x': [0, 1], 'y': [0, 1]},
                    title = {'text': "Efectividad del Control"},
                    gauge = {
                        'axis': {'range': [None, 100]},
                        'bar': {'color': "darkblue"},
                        'steps': [
                            {'range': [0, 50], 'color': "lightgray"},
                            {'range': [50, 80], 'color': "yellow"},
                            {'range': [80, 100], 'color': "green"}
                        ],
                        'threshold': {
                            'line': {'color': "red", 'width': 4},
                            'thickness': 0.75,
                            'value': 90
                        }
                    }
                ))
                st.plotly_chart(fig_gauge, use_container_width=True)

# Mostrar elementos de evidencia
st.subheader("Elementos de evidencia")

evidence_df = pd.DataFrame([
    {
        'ID de Control': item['control_id'],
        'Descripción': item['control_description'],
        'Tipo de Evidencia': item['evidence_type'],
        'Automatizado': '✅' if item['automated'] else '❌',
        'Hora de Recolección': item['collection_timestamp']
    }
    for item in ctrl_evidence['evidence_items']
])

st.dataframe(evidence_df, use_container_width=True)

# Mostrar datos de evidencia para el control seleccionado
selected_control = st.selectbox(
    "Ver datos de evidencia para el control:",
    options=[item['control_id'] for item in ctrl_evidence['evidence_items']]
)

            if selected_control:
                control_item = next(
                    item for item in ctrl_evidence['evidence_items']
                    if item['control_id'] == selected_control
                )

                with st.expander(f"Datos de Evidencia para {selected_control}"):
                    st.json(control_item['evidence_data'])

    def _create_evidence_timeline(self, evidence: Dict):
        """Crear línea de tiempo de recolección de evidencia"""

        st.header("📅 Línea de Tiempo de Recolección de Evidencia")

# Preparar datos de la línea de tiempo

timeline_data = []
for criteria_id, ctrl_evidence in evidence['control_evidence'].items():
    for item in ctrl_evidence['evidence_items']:
        timeline_data.append({
            'timestamp': datetime.fromisoformat(item['collection_timestamp']),
            'criteria': criteria_id,
            'control_id': item['control_id'],
            'evidence_type': item['evidence_type'],
            'automated': item['automated']
        })

if timeline_data:
    df_timeline = pd.DataFrame(timeline_data)

# Crear gráfico de línea de tiempo
fig_timeline = px.scatter(
 df_timeline,
 x='timestamp',
 y='criteria',
 color='evidence_type',
 symbol='automated',
 title="Cronograma de Recolección de Evidencia",
 hover_data=['control_id', 'evidence_type']
)
fig_timeline.update_layout(height=400)
st.plotly_chart(fig_timeline, use_container_width=True)

# Resumen de recolección de evidencia
st.subheader("Resumen de Recolección")
col1, col2 = st.columns(2)

with col1:
 st.write(f"**Fecha de Recolección:** {evidence['collection_metadata']['collection_date']}")
 st.write(f"**Cuenta de AWS:** {evidence['collection_metadata']['aws_account_id']}")
 st.write(f"**Versión del Colector:** {evidence['collection_metadata']['collector_version']}")

        with col2:
            st.write(f"**Hash de Evidencia:** {evidence['integrity']['evidence_hash'][:16]}...")
            st.write(f"**Tamaño de Evidencia:** {evidence['integrity']['evidence_size_bytes']:,} bytes")
            st.write(f"**Algoritmo de Integridad:** {evidence['integrity']['integrity_algorithm']}")

if __name__ == "__main__":
    dashboard = SOC2ComplianceDashboard()
    dashboard.create_dashboard()

Automatización Continua de Cumplimiento

Integración CI/CD para Cumplimiento SOC2

1. Flujo de Trabajo de Cumplimiento SOC2 en GitHub Actions

# .github/workflows/soc2-compliance.yml
name: Validación de Cumplimiento SOC2

on:
  schedule:
    - cron: '0 0 * * *' # Verificación diaria de cumplimiento
  push:
    branches: [main]
    paths: ['infrastructure/**', 'terraform/**']
  workflow_dispatch:
    inputs:
      evidence_period_days:
        description: 'Período de recolección de evidencia en días'
        required: false
        default: '30'

env:
  AWS_DEFAULT_REGION: us-east-1
  TERRAFORM_VERSION: '1.6.6'

jobs:
  infrastructure-compliance:
    name: Validación de Cumplimiento de Infraestructura
    runs-on: ubuntu-latest
    permissions:
      id-token: write
      contents: read
      security-events: write

    steps:
      - name: Checkout Code
        uses: actions/checkout@v4

      - name: Configurar Credenciales de AWS
        uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: ${{ secrets.AWS_OIDC_ROLE }}
          aws-region: ${{ env.AWS_DEFAULT_REGION }}

      - name: Configurar Terraform
        uses: hashicorp/setup-terraform@v3
        with:
          terraform_version: ${{ env.TERRAFORM_VERSION }}

      - name: Validar Configuraciones de Terraform para SOC2
        run: |
          # Validar todas las configuraciones de Terraform para el cumplimiento de SOC2
          find infrastructure/ -name "*.tf" -type f | while read tf_file; do
            echo "Validando cumplimiento de SOC2 para: $tf_file"

            # Verificar etiquetas requeridas de SOC2
            if ! grep -q "SOC2:Control" "$tf_file"; then
              echo "❌ Falta la etiqueta SOC2:Control en $tf_file"
              exit 1
            fi

            # Verificar configuraciones de cifrado
            if grep -q "aws_s3_bucket\|aws_ebs_volume\|aws_rds" "$tf_file"; then
              if ! grep -q "kms_key_id\|encryption" "$tf_file"; then
                echo "⚠️ Posible problema de cifrado en $tf_file"
              fi
            fi

            # Verificar configuraciones de registro
            if grep -q "aws_vpc\|aws_lb" "$tf_file"; then
              if ! grep -q "flow_log\|access_logs" "$tf_file"; then
                echo "⚠️ Posible problema de registro en $tf_file"
              fi
            fi
          done

      - name: Plan de Terraform con Validación SOC2
        run: |
          cd infraestructura/
          terraform init
          terraform plan -out=tfplan.binary
          terraform show -json tfplan.binary > tfplan.json

      - name: Escaneo de Cumplimiento SOC2
        run: |
          # Instalar herramientas de escaneo de cumplimiento
          pip install boto3 pandas streamlit plotly

          # Ejecutar recolección de evidencia SOC2
          python3 << 'EOF'
          import sys
          import os
          sys.path.append('scripts/')

          from soc2_evidence_collector import SOC2EvidenceCollector
          from datetime import datetime, timedelta

          # Recoger evidencia
          collector = SOC2EvidenceCollector()

          evidence_days = int(os.getenv('EVIDENCE_PERIOD_DAYS', '30'))
          end_date = datetime.now()
          start_date = end_date - timedelta(days=evidence_days)

          evidence = collector.collect_comprehensive_evidence(start_date, end_date)

# Generar informe de cumplimiento
report = collector.generate_audit_report(evidence)

# Guardar artefactos
import json
with open('soc2-evidence.json', 'w') as f:
    json.dump(evidence, f, indent=2, default=str)

with open('soc2-compliance-report.md', 'w') as f:
    f.write(report)

# Verificar umbrales de cumplimiento
overall_completeness = sum(
    ctrl['metrics']['evidence_completeness']
    for ctrl in evidence['control_evidence'].values()
) / len(evidence['control_evidence'])

print(f"Cumplimiento general: {overall_completeness:.1f}%")

if overall_completeness < 85:
    print("❌ Cumplimiento por debajo del umbral (85%)")
    sys.exit(1)
else:
    print("✅ Cumplimiento cumple con el umbral")
EOF
env:
  EVIDENCE_PERIOD_DAYS: ${{ github.event.inputs.evidence_period_days || '30' }}

- name: Cargar artefactos de evidencia SOC2
  uses: actions/upload-artifact@v3
  with:
    name: evidencia-de-cumplimiento-soc2
    path: |
      soc2-evidence.json
      soc2-compliance-report.md
      tfplan.json
    retention-days: 2557 # 7 años para SOC2

  - name: Crear Problema de Cumplimiento
    if: failure()
    uses: actions/github-script@v6
    with:
      script: |
        github.rest.issues.create({
          owner: context.repo.owner,
          repo: context.repo.repo,
          title: `Fallo de Cumplimiento SOC2 - ${new Date().toISOString()}`,
          body: `## Fallo en la Verificación de Cumplimiento SOC2

          **Ejecución del Flujo de Trabajo:** ${{ github.run_id }}
          **Rama:** ${{ github.ref }}
          **Commit:** ${{ github.sha }}

          Por favor, revise el informe de cumplimiento y aborde cualquier problema.`,
          labels: ['cumplimiento', 'soc2', 'urgente']
        });

evidence-archive:
  name: Archivar Evidencia para Auditoría
  runs-on: ubuntu-latest
  needs: infrastructure-compliance
  if: github.ref == 'refs/heads/main'

    pasos:
      - nombre: Descargar artefactos de evidencia
        usa: actions/download-artifact@v3
        con:
          nombre: soc2-compliance-evidence

      - nombre: Configurar credenciales de AWS
        usa: aws-actions/configure-aws-credentials@v4
        con:
          rol-a-asumir: ${{ secrets.AWS_OIDC_ROLE }}
          aws-region: ${{ env.AWS_DEFAULT_REGION }}

      - nombre: Archivar evidencia en S3
        ejecutar: |
          # Crear archivo con marca de tiempo
          TIMESTAMP=$(date +"%Y-%m-%d_%H-%M-%S")
          ARCHIVE_PREFIX="soc2-evidence/${TIMESTAMP}"

          # Subir evidencia con metadatos adecuados
          aws s3 cp soc2-evidence.json s3://${{ secrets.SOC2_EVIDENCE_BUCKET }}/${ARCHIVE_PREFIX}/evidence.json \
            --metadata "compliance-framework=SOC2-Type-II,collection-date=${TIMESTAMP},retention-years=7"

          aws s3 cp soc2-compliance-report.md s3://${{ secrets.SOC2_EVIDENCE_BUCKET }}/${ARCHIVE_PREFIX}/report.md \
            --metadata "compliance-framework=SOC2-Type-II,collection-date=${TIMESTAMP},retention-years=7"

          # Crear hash de integridad de evidencia
          sha256sum soc2-evidence.json > evidence-integrity.sha256
          aws s3 cp evidence-integrity.sha256 s3://${{ secrets.SOC2_EVIDENCE_BUCKET }}/${ARCHIVE_PREFIX}/integrity.sha256

          echo "Evidencia archivada en: s3://${{ secrets.SOC2_EVIDENCE_BUCKET }}/${ARCHIVE_PREFIX}/"

## Impacto en el Negocio y ROI

### Análisis de ROI de Automatización SOC2

**Costos de Implementación vs. Beneficios:**

| Categoría                     | SOC2 Manual | SOC2 Automatizado | Ahorros   |
| ---------------------------- | ----------- | ----------------- | --------- |
| **Preparación Anual de Auditoría** | 2,000 horas | 400 horas        | $240K     |
| **Recolección de Evidencia**      | 800 horas   | 80 horas         | $108K     |
| **Monitoreo de Cumplimiento**    | 1,200 horas | 200 horas        | $150K     |
| **Duración de la Auditoría**           | 8 semanas   | 3 semanas       | $75K      |
| **Esfuerzo de Documentación**     | 600 horas   | 100 horas        | $75K      |
| **Ahorros Anuales Totales**     | -           | -                | **$648K** |

**Cálculo de ROI:**

```bash
# Valor anual de automatización SOC2
AUDIT_PREPARATION_SAVINGS = 240000      # Tiempo de preparación reducido
EVIDENCE_COLLECTION_SAVINGS = 108000    # Recolección de evidencia automatizada
CONTINUOUS_MONITORING_SAVINGS = 150000  # Cumplimiento en tiempo real
AUDIT_DURATION_SAVINGS = 75000          # Finalización de auditoría más rápida
DOCUMENTATION_SAVINGS = 75000           # Reporte automatizado

TOTAL_ANNUAL_SAVINGS = AUDIT_PREPARATION_SAVINGS + EVIDENCE_COLLECTION_SAVINGS + CONTINUOUS_MONITORING_SAVINGS + AUDIT_DURATION_SAVINGS + DOCUMENTATION_SAVINGS

Ahorros Totales: $648,000 anuales

IMPLEMENTATION_COST = 150000 # Configuración inicial de automatización ANNUAL_MAINTENANCE = 30000 # Mantenimiento continuo

FIRST_YEAR_ROI = ((TOTAL_ANNUAL_SAVINGS - IMPLEMENTATION_COST - ANNUAL_MAINTENANCE) / (IMPLEMENTATION_COST + ANNUAL_MAINTENANCE)) * 100

ROI: 260% en el primer año

ONGOING_ROI = ((TOTAL_ANNUAL_SAVINGS - ANNUAL_MAINTENANCE) / ANNUAL_MAINTENANCE) * 100

ROI continuo: 2,060% anualmente


## Conclusión

La automatización de SOC2 Tipo II transforma el cumplimiento de un proceso anual oneroso en una capacidad empresarial continua que agrega valor. Al implementar Infraestructura como Código con controles de seguridad integrados, recolección automatizada de evidencia y monitoreo continuo de cumplimiento, las organizaciones logran tanto el cumplimiento regulatorio como una mejora en la postura de seguridad.

La clave para una automatización exitosa de SOC2 radica en integrar el cumplimiento en sus flujos de trabajo de desarrollo y operaciones desde el principio, en lugar de tratarlo como un ejercicio de auditoría posterior. Este enfoque reduce los costos de cumplimiento mientras mejora los resultados reales de seguridad.

Recuerde que la automatización de SOC2 no se trata de marcar casillas, sino de construir sistemas que proporcionen una garantía continua sobre sus controles de seguridad y procesos empresariales.

**Su viaje de automatización de SOC2 comienza implementando infraestructura como código con etiquetado de cumplimiento. Comience hoy y avance hacia la validación continua de cumplimiento.**