SOC2 Type II Automation: Evidence Collection with Infrastructure as Code

SOC2 Type II Automation: Evidence Collection with Infrastructure as Code

The SOC2 Type II Automation Challenge

Your organization undergoes SOC2 Type II audits annually, requiring months of manual evidence collection, policy documentation, and control validation across hundreds of systems and processes. Auditors request evidence spanning 12 months of operations, forcing your team to retroactively gather logs, screenshots, and documentation that may be incomplete or inconsistent. This manual approach creates audit anxiety, consumes significant resources, and provides limited assurance about actual security posture.

SOC2 Type II automation transforms compliance from a periodic scramble into continuous validation, providing real-time evidence collection, automated policy enforcement, and comprehensive audit trails that reduce audit time from months to weeks while improving actual security posture.

SOC2 Type II Framework for DevSecOps

SOC2 Type II evaluates the effectiveness of controls over time, requiring evidence of consistent implementation across all Trust Service Criteria: Security, Availability, Processing Integrity, Confidentiality, and Privacy. Modern cloud-native environments enable automated evidence collection and continuous compliance validation.

Core SOC2 Type II Automation Components

1. Continuous Evidence Collection

  • Automated log aggregation and retention across all systems
  • Real-time policy compliance monitoring and validation
  • Configuration management and drift detection
  • Identity and access management audit trails

2. Automated Control Implementation

  • Infrastructure as Code with embedded security controls
  • Policy as Code for automated governance enforcement
  • Continuous security monitoring and alerting
  • Automated remediation and response workflows

3. Audit Trail Generation

  • Immutable audit logs with cryptographic verification
  • Automated compliance reporting and dashboards
  • Evidence packaging and audit support automation
  • Historical compliance analysis and trending

Infrastructure as Code for SOC2 Compliance

Infrastructure as Code provides the foundation for consistent, auditable, and repeatable security control implementation across all environments.

SOC2-Compliant Infrastructure Templates

1. Security-Focused Terraform Modules

# terraform/modules/soc2-compliant-vpc/main.tf
terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
}

# SOC2 Security Control: Network Segmentation (CC6.1)
resource "aws_vpc" "soc2_vpc" {
  cidr_block           = var.vpc_cidr
  enable_dns_hostnames = true
  enable_dns_support   = true

  tags = merge(var.common_tags, {
    Name                    = "${var.environment}-soc2-vpc"
    "SOC2:Control"         = "CC6.1"
    "SOC2:Description"     = "Network segmentation and access controls"
    "Compliance:Framework" = "SOC2-Type-II"
    "Audit:Required"       = "true"
  })
}

# SOC2 Security Control: VPC Flow Logs (CC7.2)
resource "aws_flow_log" "vpc_flow_log" {
  iam_role_arn    = aws_iam_role.flow_log_role.arn
  log_destination = aws_cloudwatch_log_group.vpc_flow_log.arn
  traffic_type    = "ALL"
  vpc_id          = aws_vpc.soc2_vpc.id

  tags = merge(var.common_tags, {
    Name                    = "${var.environment}-vpc-flow-logs"
    "SOC2:Control"         = "CC7.2"
    "SOC2:Description"     = "Network monitoring and logging"
    "Audit:RetentionDays"  = "2557"  # 7 years for SOC2
  })
}

# SOC2 Security Control: Encrypted CloudWatch Logs (CC6.7)
resource "aws_cloudwatch_log_group" "vpc_flow_log" {
  name              = "/aws/vpc/flowlogs/${var.environment}"
  retention_in_days = 2557  # 7 years retention for SOC2
  kms_key_id        = aws_kms_key.soc2_logging_key.arn

  tags = merge(var.common_tags, {
    "SOC2:Control"     = "CC6.7"
    "SOC2:Description" = "Encrypted audit logging"
    "Audit:Critical"   = "true"
  })
}

# SOC2 Security Control: Encryption Key Management (CC6.7)
resource "aws_kms_key" "soc2_logging_key" {
  description             = "SOC2 compliance logging encryption key"
  deletion_window_in_days = 30
  enable_key_rotation     = true

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "Enable IAM User Permissions"
        Effect = "Allow"
        Principal = {
          AWS = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:root"
        }
        Action   = "kms:*"
        Resource = "*"
      },
      {
        Sid    = "Allow CloudWatch Logs"
        Effect = "Allow"
        Principal = {
          Service = "logs.${data.aws_region.current.name}.amazonaws.com"
        }
        Action = [
          "kms:Encrypt",
          "kms:Decrypt",
          "kms:ReEncrypt*",
          "kms:GenerateDataKey*",
          "kms:DescribeKey"
        ]
        Resource = "*"
        Condition = {
          ArnEquals = {
            "kms:EncryptionContext:aws:logs:arn" = "arn:aws:logs:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:log-group:/aws/vpc/flowlogs/${var.environment}"
          }
        }
      }
    ]
  })

  tags = merge(var.common_tags, {
    Name                    = "${var.environment}-soc2-logging-key"
    "SOC2:Control"         = "CC6.7"
    "SOC2:Description"     = "Encryption key for compliance logging"
    "Audit:KeyRotation"    = "enabled"
  })
}

resource "aws_kms_alias" "soc2_logging_key_alias" {
  name          = "alias/${var.environment}-soc2-logging"
  target_key_id = aws_kms_key.soc2_logging_key.key_id
}

# SOC2 Security Control: Private Subnets (CC6.1)
resource "aws_subnet" "private_subnets" {
  count = length(var.private_subnet_cidrs)

  vpc_id            = aws_vpc.soc2_vpc.id
  cidr_block        = var.private_subnet_cidrs[count.index]
  availability_zone = data.aws_availability_zones.available.names[count.index]

  map_public_ip_on_launch = false

  tags = merge(var.common_tags, {
    Name                    = "${var.environment}-private-subnet-${count.index + 1}"
    Type                    = "Private"
    "SOC2:Control"         = "CC6.1"
    "SOC2:Description"     = "Isolated private network segment"
    "kubernetes.io/role/internal-elb" = "1"
  })
}

# SOC2 Security Control: Public Subnets with NACLs (CC6.1)
resource "aws_subnet" "public_subnets" {
  count = length(var.public_subnet_cidrs)

  vpc_id            = aws_vpc.soc2_vpc.id
  cidr_block        = var.public_subnet_cidrs[count.index]
  availability_zone = data.aws_availability_zones.available.names[count.index]

  map_public_ip_on_launch = true

  tags = merge(var.common_tags, {
    Name                    = "${var.environment}-public-subnet-${count.index + 1}"
    Type                    = "Public"
    "SOC2:Control"         = "CC6.1"
    "SOC2:Description"     = "Controlled public network access"
    "kubernetes.io/role/elb" = "1"
  })
}

# SOC2 Security Control: Network ACLs (CC6.1)
resource "aws_network_acl" "private_nacl" {
  vpc_id     = aws_vpc.soc2_vpc.id
  subnet_ids = aws_subnet.private_subnets[*].id

  # Allow inbound HTTPS from public subnets
  ingress {
    protocol   = "tcp"
    rule_no    = 100
    action     = "allow"
    cidr_block = var.vpc_cidr
    from_port  = 443
    to_port    = 443
  }

  # Allow inbound HTTP from public subnets (for health checks)
  ingress {
    protocol   = "tcp"
    rule_no    = 110
    action     = "allow"
    cidr_block = var.vpc_cidr
    from_port  = 80
    to_port    = 80
  }

  # Allow ephemeral ports for responses
  ingress {
    protocol   = "tcp"
    rule_no    = 120
    action     = "allow"
    cidr_block = "0.0.0.0/0"
    from_port  = 1024
    to_port    = 65535
  }

  # Allow all outbound traffic
  egress {
    protocol   = "-1"
    rule_no    = 100
    action     = "allow"
    cidr_block = "0.0.0.0/0"
    from_port  = 0
    to_port    = 0
  }

  tags = merge(var.common_tags, {
    Name                    = "${var.environment}-private-nacl"
    "SOC2:Control"         = "CC6.1"
    "SOC2:Description"     = "Network access control list for private subnets"
  })
}

# SOC2 Security Control: IAM Role for Flow Logs (CC6.2)
resource "aws_iam_role" "flow_log_role" {
  name = "${var.environment}-vpc-flow-log-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "vpc-flow-logs.amazonaws.com"
        }
      }
    ]
  })

  tags = merge(var.common_tags, {
    "SOC2:Control"     = "CC6.2"
    "SOC2:Description" = "IAM role for VPC flow logs service"
  })
}

resource "aws_iam_role_policy" "flow_log_policy" {
  name = "${var.environment}-vpc-flow-log-policy"
  role = aws_iam_role.flow_log_role.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents",
          "logs:DescribeLogGroups",
          "logs:DescribeLogStreams"
        ]
        Effect   = "Allow"
        Resource = aws_cloudwatch_log_group.vpc_flow_log.arn
      }
    ]
  })
}

# Data sources
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}
data "aws_availability_zones" "available" {
  state = "available"
}

# Variables
variable "environment" {
  description = "Environment name (e.g., production, staging)"
  type        = string
}

variable "vpc_cidr" {
  description = "CIDR block for VPC"
  type        = string
  default     = "10.0.0.0/16"
}

variable "private_subnet_cidrs" {
  description = "CIDR blocks for private subnets"
  type        = list(string)
  default     = ["10.0.1.0/24", "10.0.2.0/24", "10.0.3.0/24"]
}

variable "public_subnet_cidrs" {
  description = "CIDR blocks for public subnets"
  type        = list(string)
  default     = ["10.0.101.0/24", "10.0.102.0/24", "10.0.103.0/24"]
}

variable "common_tags" {
  description = "Common tags for all resources"
  type        = map(string)
  default = {
    "Terraform"            = "true"
    "Compliance:Framework" = "SOC2-Type-II"
    "Environment"          = "production"
  }
}

# Outputs
output "vpc_id" {
  description = "ID of the VPC"
  value       = aws_vpc.soc2_vpc.id
}

output "private_subnet_ids" {
  description = "IDs of the private subnets"
  value       = aws_subnet.private_subnets[*].id
}

output "public_subnet_ids" {
  description = "IDs of the public subnets"
  value       = aws_subnet.public_subnets[*].id
}

output "flow_log_group_name" {
  description = "Name of the VPC flow log CloudWatch log group"
  value       = aws_cloudwatch_log_group.vpc_flow_log.name
}

output "soc2_compliance_tags" {
  description = "SOC2 compliance tags applied to resources"
  value = {
    for key, value in var.common_tags : key => value
    if can(regex("^(SOC2|Compliance|Audit):", key))
  }
}

2. SOC2 Application Security Module

# terraform/modules/soc2-application/main.tf

# SOC2 Security Control: ECS Cluster with Security Configuration (CC6.1)
resource "aws_ecs_cluster" "soc2_cluster" {
  name = "${var.application_name}-${var.environment}"

  setting {
    name  = "containerInsights"
    value = "enabled"
  }

  configuration {
    execute_command_configuration {
      kms_key_id = aws_kms_key.ecs_exec_key.arn
      logging    = "OVERRIDE"

      log_configuration {
        cloud_watch_encryption_enabled = true
        cloud_watch_log_group_name     = aws_cloudwatch_log_group.ecs_exec_logs.name
      }
    }
  }

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-cluster"
    "SOC2:Control"         = "CC6.1"
    "SOC2:Description"     = "Secure container orchestration platform"
    "Audit:ContainerInsights" = "enabled"
  })
}

# SOC2 Security Control: Application Load Balancer with SSL (CC6.7)
resource "aws_lb" "application_lb" {
  name               = "${var.application_name}-${var.environment}-alb"
  internal           = false
  load_balancer_type = "application"
  security_groups    = [aws_security_group.alb_sg.id]
  subnets            = var.public_subnet_ids

  enable_deletion_protection = var.environment == "production" ? true : false
  enable_http2               = true

  access_logs {
    bucket  = aws_s3_bucket.alb_logs.bucket
    prefix  = "alb-logs"
    enabled = true
  }

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-alb"
    "SOC2:Control"         = "CC6.7"
    "SOC2:Description"     = "Encrypted application load balancer"
    "Audit:AccessLogs"     = "enabled"
  })
}

# SOC2 Security Control: S3 Bucket for ALB Logs (CC7.2)
resource "aws_s3_bucket" "alb_logs" {
  bucket        = "${var.application_name}-${var.environment}-alb-logs-${random_string.bucket_suffix.result}"
  force_destroy = var.environment != "production"

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-alb-logs"
    "SOC2:Control"         = "CC7.2"
    "SOC2:Description"     = "Access logs for application load balancer"
    "Audit:RetentionYears" = "7"
  })
}

resource "aws_s3_bucket_versioning" "alb_logs" {
  bucket = aws_s3_bucket.alb_logs.id
  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_encryption" "alb_logs" {
  bucket = aws_s3_bucket.alb_logs.id

  server_side_encryption_configuration {
    rule {
      apply_server_side_encryption_by_default {
        kms_master_key_id = aws_kms_key.s3_key.arn
        sse_algorithm     = "aws:kms"
      }
    }
  }
}

resource "aws_s3_bucket_lifecycle_configuration" "alb_logs" {
  bucket = aws_s3_bucket.alb_logs.id

  rule {
    id     = "soc2_compliance_retention"
    status = "Enabled"

    transition {
      days          = 30
      storage_class = "STANDARD_IA"
    }

    transition {
      days          = 90
      storage_class = "GLACIER"
    }

    transition {
      days          = 365
      storage_class = "DEEP_ARCHIVE"
    }

    expiration {
      days = 2557  # 7 years for SOC2 compliance
    }
  }
}

# SOC2 Security Control: ECS Task Definition with Security Context (CC6.1)
resource "aws_ecs_task_definition" "app_task" {
  family                   = "${var.application_name}-${var.environment}"
  network_mode             = "awsvpc"
  requires_compatibilities = ["FARGATE"]
  cpu                      = var.task_cpu
  memory                   = var.task_memory
  execution_role_arn       = aws_iam_role.ecs_execution_role.arn
  task_role_arn           = aws_iam_role.ecs_task_role.arn

  container_definitions = jsonencode([
    {
      name      = var.application_name
      image     = var.container_image
      essential = true

      portMappings = [
        {
          containerPort = var.container_port
          protocol      = "tcp"
        }
      ]

      # SOC2 Security Control: Application Logging (CC7.2)
      logConfiguration = {
        logDriver = "awslogs"
        options = {
          awslogs-group         = aws_cloudwatch_log_group.app_logs.name
          awslogs-region        = data.aws_region.current.name
          awslogs-stream-prefix = "ecs"
        }
      }

      # SOC2 Security Control: Environment Variable Security
      environment = [
        {
          name  = "ENVIRONMENT"
          value = var.environment
        },
        {
          name  = "LOG_LEVEL"
          value = var.environment == "production" ? "INFO" : "DEBUG"
        }
      ]

      # SOC2 Security Control: Secrets Management (CC6.7)
      secrets = [
        {
          name      = "DATABASE_PASSWORD"
          valueFrom = aws_secretsmanager_secret.app_secrets.arn
        }
      ]

      # SOC2 Security Control: Container Security
      readonlyRootFilesystem = true
      user                   = "1001"  # Non-root user

      healthCheck = {
        command     = ["CMD-SHELL", "curl -f http://localhost:${var.container_port}/health || exit 1"]
        interval    = 30
        timeout     = 5
        retries     = 3
        startPeriod = 60
      }
    }
  ])

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-task"
    "SOC2:Control"         = "CC6.1"
    "SOC2:Description"     = "Secure container task definition"
    "Audit:ReadOnlyRoot"   = "true"
    "Audit:NonRootUser"    = "true"
  })
}

# SOC2 Security Control: CloudWatch Log Groups with Encryption (CC6.7)
resource "aws_cloudwatch_log_group" "app_logs" {
  name              = "/ecs/${var.application_name}-${var.environment}"
  retention_in_days = 2557  # 7 years for SOC2
  kms_key_id        = aws_kms_key.cloudwatch_key.arn

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-logs"
    "SOC2:Control"         = "CC6.7"
    "SOC2:Description"     = "Encrypted application logs"
    "Audit:RetentionDays"  = "2557"
  })
}

resource "aws_cloudwatch_log_group" "ecs_exec_logs" {
  name              = "/ecs/exec/${var.application_name}-${var.environment}"
  retention_in_days = 90  # 90 days for executive access logs
  kms_key_id        = aws_kms_key.ecs_exec_key.arn

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-exec-logs"
    "SOC2:Control"         = "CC6.2"
    "SOC2:Description"     = "Container executive access logs"
    "Audit:AccessType"     = "administrative"
  })
}

# SOC2 Security Control: KMS Keys for Encryption (CC6.7)
resource "aws_kms_key" "cloudwatch_key" {
  description             = "KMS key for CloudWatch logs encryption"
  deletion_window_in_days = 30
  enable_key_rotation     = true

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-cloudwatch-key"
    "SOC2:Control"         = "CC6.7"
    "SOC2:Description"     = "Encryption key for CloudWatch logs"
    "Audit:KeyRotation"    = "enabled"
  })
}

resource "aws_kms_key" "ecs_exec_key" {
  description             = "KMS key for ECS Exec encryption"
  deletion_window_in_days = 30
  enable_key_rotation     = true

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-ecs-exec-key"
    "SOC2:Control"         = "CC6.7"
    "SOC2:Description"     = "Encryption key for ECS Exec sessions"
  })
}

resource "aws_kms_key" "s3_key" {
  description             = "KMS key for S3 encryption"
  deletion_window_in_days = 30
  enable_key_rotation     = true

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-s3-key"
    "SOC2:Control"         = "CC6.7"
    "SOC2:Description"     = "Encryption key for S3 buckets"
  })
}

# SOC2 Security Control: Secrets Manager (CC6.7)
resource "aws_secretsmanager_secret" "app_secrets" {
  name                    = "${var.application_name}-${var.environment}-secrets"
  description             = "Application secrets for ${var.application_name}"
  kms_key_id             = aws_kms_key.secrets_key.arn
  recovery_window_in_days = var.environment == "production" ? 30 : 0

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-secrets"
    "SOC2:Control"         = "CC6.7"
    "SOC2:Description"     = "Encrypted secrets management"
    "Audit:SecretRotation" = "enabled"
  })
}

resource "aws_kms_key" "secrets_key" {
  description             = "KMS key for Secrets Manager"
  deletion_window_in_days = 30
  enable_key_rotation     = true

  tags = merge(var.common_tags, {
    Name                    = "${var.application_name}-${var.environment}-secrets-key"
    "SOC2:Control"         = "CC6.7"
    "SOC2:Description"     = "Encryption key for secrets"
  })
}

# Random string for unique bucket naming
resource "random_string" "bucket_suffix" {
  length  = 8
  special = false
  upper   = false
}

# Data sources
data "aws_region" "current" {}
data "aws_caller_identity" "current" {}

Automated SOC2 Evidence Collection

Continuous Compliance Monitoring System

1. SOC2 Evidence Collection Automation

#!/usr/bin/env python3
# soc2-automation/evidence_collector.py

import boto3
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import pandas as pd
from pathlib import Path
import hashlib

class SOC2EvidenceCollector:
    def __init__(self, aws_profile: str = None):
        self.session = boto3.Session(profile_name=aws_profile)
        self.logger = logging.getLogger(__name__)

        # Initialize AWS clients
        self.cloudtrail = self.session.client('cloudtrail')
        self.cloudwatch = self.session.client('cloudwatch')
        self.logs = self.session.client('logs')
        self.config = self.session.client('config')
        self.iam = self.session.client('iam')
        self.s3 = self.session.client('s3')

        # SOC2 Trust Service Criteria mapping
        self.trust_criteria = {
            'CC1': 'Control Environment',
            'CC2': 'Communication and Information',
            'CC3': 'Risk Assessment',
            'CC4': 'Monitoring Activities',
            'CC5': 'Control Activities',
            'CC6': 'Logical and Physical Access Controls',
            'CC7': 'System Operations',
            'CC8': 'Change Management',
            'CC9': 'Risk Mitigation'
        }

    def collect_comprehensive_evidence(self, start_date: datetime,
                                     end_date: datetime) -> Dict:
        """Collect comprehensive SOC2 Type II evidence"""

        evidence_package = {
            'collection_metadata': {
                'collection_date': datetime.now().isoformat(),
                'evidence_period_start': start_date.isoformat(),
                'evidence_period_end': end_date.isoformat(),
                'collector_version': '2.0',
                'aws_account_id': self.session.client('sts').get_caller_identity()['Account']
            },
            'control_evidence': {}
        }

        # Collect evidence for each Trust Service Criteria
        for criteria_id, description in self.trust_criteria.items():
            self.logger.info(f"Collecting evidence for {criteria_id}: {description}")
            evidence_package['control_evidence'][criteria_id] = \
                self._collect_criteria_evidence(criteria_id, start_date, end_date)

        # Generate evidence integrity hashes
        evidence_package['integrity'] = self._generate_evidence_integrity(evidence_package)

        return evidence_package

    def _collect_criteria_evidence(self, criteria_id: str,
                                 start_date: datetime, end_date: datetime) -> Dict:
        """Collect evidence for specific Trust Service Criteria"""

        evidence = {
            'criteria_id': criteria_id,
            'criteria_description': self.trust_criteria[criteria_id],
            'evidence_items': [],
            'metrics': {},
            'compliance_status': 'PENDING'
        }

        if criteria_id == 'CC1':  # Control Environment
            evidence['evidence_items'].extend(self._collect_cc1_evidence(start_date, end_date))
        elif criteria_id == 'CC2':  # Communication and Information
            evidence['evidence_items'].extend(self._collect_cc2_evidence(start_date, end_date))
        elif criteria_id == 'CC6':  # Logical and Physical Access Controls
            evidence['evidence_items'].extend(self._collect_cc6_evidence(start_date, end_date))
        elif criteria_id == 'CC7':  # System Operations
            evidence['evidence_items'].extend(self._collect_cc7_evidence(start_date, end_date))
        elif criteria_id == 'CC8':  # Change Management
            evidence['evidence_items'].extend(self._collect_cc8_evidence(start_date, end_date))

        # Calculate compliance metrics
        evidence['metrics'] = self._calculate_compliance_metrics(evidence['evidence_items'])
        evidence['compliance_status'] = self._determine_compliance_status(evidence['metrics'])

        return evidence

    def _collect_cc1_evidence(self, start_date: datetime, end_date: datetime) -> List[Dict]:
        """Collect CC1 (Control Environment) evidence"""

        evidence_items = []

        # CC1.1: Management establishes structures, reporting lines, and authorities
        org_policies = self._get_iam_policies_evidence()
        evidence_items.append({
            'control_id': 'CC1.1',
            'control_description': 'Organizational structure and authority',
            'evidence_type': 'iam_policies',
            'evidence_data': org_policies,
            'collection_timestamp': datetime.now().isoformat(),
            'automated': True
        })

        # CC1.2: Board of directors and management establish oversight responsibilities
        governance_evidence = self._get_governance_evidence(start_date, end_date)
        evidence_items.append({
            'control_id': 'CC1.2',
            'control_description': 'Governance and oversight',
            'evidence_type': 'governance_activities',
            'evidence_data': governance_evidence,
            'collection_timestamp': datetime.now().isoformat(),
            'automated': True
        })

        return evidence_items

    def _collect_cc6_evidence(self, start_date: datetime, end_date: datetime) -> List[Dict]:
        """Collect CC6 (Logical and Physical Access Controls) evidence"""

        evidence_items = []

        # CC6.1: Access controls restrict unauthorized access
        access_controls = self._get_access_control_evidence(start_date, end_date)
        evidence_items.append({
            'control_id': 'CC6.1',
            'control_description': 'Logical access controls',
            'evidence_type': 'access_controls',
            'evidence_data': access_controls,
            'collection_timestamp': datetime.now().isoformat(),
            'automated': True
        })

        # CC6.2: Privileged access is restricted and monitored
        privileged_access = self._get_privileged_access_evidence(start_date, end_date)
        evidence_items.append({
            'control_id': 'CC6.2',
            'control_description': 'Privileged access monitoring',
            'evidence_type': 'privileged_access_logs',
            'evidence_data': privileged_access,
            'collection_timestamp': datetime.now().isoformat(),
            'automated': True
        })

        # CC6.7: Data transmission and disposal controls
        encryption_evidence = self._get_encryption_evidence()
        evidence_items.append({
            'control_id': 'CC6.7',
            'control_description': 'Data transmission and encryption',
            'evidence_type': 'encryption_configuration',
            'evidence_data': encryption_evidence,
            'collection_timestamp': datetime.now().isoformat(),
            'automated': True
        })

        return evidence_items

    def _collect_cc7_evidence(self, start_date: datetime, end_date: datetime) -> List[Dict]:
        """Collect CC7 (System Operations) evidence"""

        evidence_items = []

        # CC7.1: System capacity and performance monitoring
        performance_monitoring = self._get_performance_monitoring_evidence(start_date, end_date)
        evidence_items.append({
            'control_id': 'CC7.1',
            'control_description': 'Performance and capacity monitoring',
            'evidence_type': 'performance_metrics',
            'evidence_data': performance_monitoring,
            'collection_timestamp': datetime.now().isoformat(),
            'automated': True
        })

        # CC7.2: System monitoring for security events
        security_monitoring = self._get_security_monitoring_evidence(start_date, end_date)
        evidence_items.append({
            'control_id': 'CC7.2',
            'control_description': 'Security event monitoring',
            'evidence_type': 'security_logs',
            'evidence_data': security_monitoring,
            'collection_timestamp': datetime.now().isoformat(),
            'automated': True
        })

        return evidence_items

    def _collect_cc8_evidence(self, start_date: datetime, end_date: datetime) -> List[Dict]:
        """Collect CC8 (Change Management) evidence"""

        evidence_items = []

        # CC8.1: Change management process
        change_management = self._get_change_management_evidence(start_date, end_date)
        evidence_items.append({
            'control_id': 'CC8.1',
            'control_description': 'Change management process',
            'evidence_type': 'infrastructure_changes',
            'evidence_data': change_management,
            'collection_timestamp': datetime.now().isoformat(),
            'automated': True
        })

        return evidence_items

    def _get_access_control_evidence(self, start_date: datetime, end_date: datetime) -> Dict:
        """Collect access control evidence from CloudTrail"""

        # Query CloudTrail for authentication and authorization events
        events = self.cloudtrail.lookup_events(
            LookupAttributes=[
                {
                    'AttributeKey': 'EventName',
                    'AttributeValue': 'AssumeRole'
                }
            ],
            StartTime=start_date,
            EndTime=end_date,
            MaxItems=1000
        )

        access_events = []
        for event in events.get('Events', []):
            access_events.append({
                'event_time': event['EventTime'].isoformat(),
                'event_name': event['EventName'],
                'user_identity': event.get('UserIdentity', {}),
                'source_ip': event.get('SourceIPAddress'),
                'user_agent': event.get('UserAgent'),
                'resources': event.get('Resources', []),
                'cloud_trail_event': event.get('CloudTrailEvent')
            })

        # Get current IAM configuration
        iam_users = self.iam.list_users()
        iam_roles = self.iam.list_roles()

        return {
            'access_events_count': len(access_events),
            'access_events_sample': access_events[:50],  # First 50 for audit
            'iam_users_count': len(iam_users['Users']),
            'iam_roles_count': len(iam_roles['Roles']),
            'user_summary': [
                {
                    'username': user['UserName'],
                    'create_date': user['CreateDate'].isoformat(),
                    'password_last_used': user.get('PasswordLastUsed', 'Never').isoformat()
                                         if user.get('PasswordLastUsed') != 'Never' else 'Never'
                }
                for user in iam_users['Users']
            ],
            'role_summary': [
                {
                    'role_name': role['RoleName'],
                    'create_date': role['CreateDate'].isoformat(),
                    'assume_role_policy': role['AssumeRolePolicyDocument']
                }
                for role in iam_roles['Roles']
            ]
        }

    def _get_privileged_access_evidence(self, start_date: datetime, end_date: datetime) -> Dict:
        """Collect privileged access evidence"""

        # Query for privileged operations
        privileged_events = self.cloudtrail.lookup_events(
            LookupAttributes=[
                {
                    'AttributeKey': 'EventName',
                    'AttributeValue': 'CreateUser'
                }
            ],
            StartTime=start_date,
            EndTime=end_date
        )

        # Also check for administrative console access
        console_events = self.cloudtrail.lookup_events(
            LookupAttributes=[
                {
                    'AttributeKey': 'EventName',
                    'AttributeValue': 'ConsoleLogin'
                }
            ],
            StartTime=start_date,
            EndTime=end_date
        )

        return {
            'privileged_operations': len(privileged_events.get('Events', [])),
            'console_logins': len(console_events.get('Events', [])),
            'privileged_events_sample': [
                {
                    'event_time': event['EventTime'].isoformat(),
                    'event_name': event['EventName'],
                    'user_identity': event.get('UserIdentity', {}),
                    'source_ip': event.get('SourceIPAddress')
                }
                for event in privileged_events.get('Events', [])[:20]
            ],
            'console_login_sample': [
                {
                    'event_time': event['EventTime'].isoformat(),
                    'user_identity': event.get('UserIdentity', {}),
                    'source_ip': event.get('SourceIPAddress'),
                    'mfa_used': 'Yes' if event.get('CloudTrailEvent', '').find('"mfaUsed":"true"') > -1 else 'No'
                }
                for event in console_events.get('Events', [])[:20]
            ]
        }

    def _get_encryption_evidence(self) -> Dict:
        """Collect encryption configuration evidence"""

        # Get KMS keys
        kms_client = self.session.client('kms')
        keys = kms_client.list_keys()

        encryption_evidence = {
            'kms_keys_count': len(keys['Keys']),
            'kms_keys_details': []
        }

        for key in keys['Keys'][:20]:  # Sample first 20
            try:
                key_details = kms_client.describe_key(KeyId=key['KeyId'])
                key_rotation = kms_client.get_key_rotation_status(KeyId=key['KeyId'])

                encryption_evidence['kms_keys_details'].append({
                    'key_id': key['KeyId'],
                    'key_arn': key['Arn'],
                    'description': key_details['KeyMetadata'].get('Description', ''),
                    'key_usage': key_details['KeyMetadata'].get('KeyUsage', ''),
                    'key_state': key_details['KeyMetadata'].get('KeyState', ''),
                    'creation_date': key_details['KeyMetadata'].get('CreationDate', '').isoformat()
                                   if key_details['KeyMetadata'].get('CreationDate') else '',
                    'rotation_enabled': key_rotation.get('KeyRotationEnabled', False)
                })
            except Exception as e:
                self.logger.warning(f"Could not get details for key {key['KeyId']}: {str(e)}")

        # Get S3 bucket encryption
        s3_buckets = self.s3.list_buckets()
        bucket_encryption = []

        for bucket in s3_buckets['Buckets'][:10]:  # Sample first 10
            try:
                encryption = self.s3.get_bucket_encryption(Bucket=bucket['Name'])
                bucket_encryption.append({
                    'bucket_name': bucket['Name'],
                    'encryption_algorithm': encryption['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'],
                    'kms_key_id': encryption['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault'].get('KMSMasterKeyID', 'Default')
                })
            except:
                bucket_encryption.append({
                    'bucket_name': bucket['Name'],
                    'encryption_algorithm': 'None',
                    'kms_key_id': 'None'
                })

        encryption_evidence['s3_bucket_encryption'] = bucket_encryption

        return encryption_evidence

    def _get_security_monitoring_evidence(self, start_date: datetime, end_date: datetime) -> Dict:
        """Collect security monitoring evidence"""

        # Get CloudWatch alarms
        alarms = self.cloudwatch.describe_alarms()

        # Get log groups for security monitoring
        log_groups = self.logs.describe_log_groups()

        # Get security-related CloudTrail events
        security_events = self.cloudtrail.lookup_events(
            LookupAttributes=[
                {
                    'AttributeKey': 'EventName',
                    'AttributeValue': 'CreateSecurityGroup'
                }
            ],
            StartTime=start_date,
            EndTime=end_date
        )

        return {
            'cloudwatch_alarms_count': len(alarms['MetricAlarms']),
            'security_log_groups': [
                lg['logGroupName'] for lg in log_groups['logGroups']
                if any(keyword in lg['logGroupName'].lower() for keyword in ['security', 'auth', 'access', 'audit'])
            ],
            'security_events_count': len(security_events.get('Events', [])),
            'monitoring_metrics': {
                'alarms_in_alarm_state': len([alarm for alarm in alarms['MetricAlarms'] if alarm['StateValue'] == 'ALARM']),
                'alarms_ok_state': len([alarm for alarm in alarms['MetricAlarms'] if alarm['StateValue'] == 'OK']),
                'log_groups_with_retention': len([lg for lg in log_groups['logGroups'] if lg.get('retentionInDays')])
            }
        }

    def _get_change_management_evidence(self, start_date: datetime, end_date: datetime) -> Dict:
        """Collect change management evidence"""

        # Get CloudFormation stack events
        cf_client = self.session.client('cloudformation')
        stacks = cf_client.list_stacks(StackStatusFilter=['CREATE_COMPLETE', 'UPDATE_COMPLETE'])

        change_events = []
        for stack in stacks['StackSummaries'][:20]:  # Sample first 20
            try:
                events = cf_client.describe_stack_events(StackName=stack['StackName'])
                stack_changes = [
                    {
                        'stack_name': stack['StackName'],
                        'event_id': event['EventId'],
                        'timestamp': event['Timestamp'].isoformat(),
                        'resource_type': event.get('ResourceType'),
                        'resource_status': event.get('ResourceStatus'),
                        'resource_status_reason': event.get('ResourceStatusReason')
                    }
                    for event in events['StackEvents']
                    if start_date <= event['Timestamp'].replace(tzinfo=None) <= end_date
                ]
                change_events.extend(stack_changes)
            except Exception as e:
                self.logger.warning(f"Could not get events for stack {stack['StackName']}: {str(e)}")

        # Get EC2 instance state changes
        ec2_events = self.cloudtrail.lookup_events(
            LookupAttributes=[
                {
                    'AttributeKey': 'EventName',
                    'AttributeValue': 'RunInstances'
                }
            ],
            StartTime=start_date,
            EndTime=end_date
        )

        return {
            'cloudformation_changes': len(change_events),
            'ec2_instance_launches': len(ec2_events.get('Events', [])),
            'change_events_sample': change_events[:50],
            'infrastructure_change_summary': {
                'total_changes': len(change_events) + len(ec2_events.get('Events', [])),
                'cloudformation_stacks': len(stacks['StackSummaries']),
                'change_frequency_per_day': len(change_events) / max((end_date - start_date).days, 1)
            }
        }

    def _calculate_compliance_metrics(self, evidence_items: List[Dict]) -> Dict:
        """Calculate compliance metrics from evidence"""

        metrics = {
            'total_controls': len(evidence_items),
            'automated_controls': len([item for item in evidence_items if item.get('automated', False)]),
            'manual_controls': len([item for item in evidence_items if not item.get('automated', True)]),
            'evidence_completeness': 0,
            'control_effectiveness': 0
        }

        # Calculate evidence completeness
        complete_evidence = len([item for item in evidence_items if item.get('evidence_data')])
        metrics['evidence_completeness'] = (complete_evidence / len(evidence_items) * 100) if evidence_items else 0

        # Calculate control effectiveness (simplified scoring)
        effectiveness_scores = []
        for item in evidence_items:
            evidence_data = item.get('evidence_data', {})
            if isinstance(evidence_data, dict):
                # Simple scoring based on evidence richness
                score = min(100, len(str(evidence_data)) / 100)  # Basic scoring
                effectiveness_scores.append(score)

        metrics['control_effectiveness'] = sum(effectiveness_scores) / len(effectiveness_scores) if effectiveness_scores else 0

        return metrics

    def _determine_compliance_status(self, metrics: Dict) -> str:
        """Determine overall compliance status"""

        completeness = metrics.get('evidence_completeness', 0)
        effectiveness = metrics.get('control_effectiveness', 0)

        if completeness >= 95 and effectiveness >= 80:
            return 'COMPLIANT'
        elif completeness >= 85 and effectiveness >= 70:
            return 'MOSTLY_COMPLIANT'
        elif completeness >= 70:
            return 'PARTIALLY_COMPLIANT'
        else:
            return 'NON_COMPLIANT'

    def _generate_evidence_integrity(self, evidence_package: Dict) -> Dict:
        """Generate integrity hashes for evidence package"""

        # Create hash of evidence data
        evidence_string = json.dumps(evidence_package['control_evidence'], sort_keys=True)
        evidence_hash = hashlib.sha256(evidence_string.encode()).hexdigest()

        return {
            'evidence_hash': evidence_hash,
            'collection_timestamp': datetime.now().isoformat(),
            'integrity_algorithm': 'SHA256',
            'evidence_size_bytes': len(evidence_string)
        }

    def generate_audit_report(self, evidence_package: Dict) -> str:
        """Generate human-readable audit report"""

        report = f"""
# SOC2 Type II Evidence Collection Report
Generated: {evidence_package['collection_metadata']['collection_date']}

## Collection Summary
- **Evidence Period**: {evidence_package['collection_metadata']['evidence_period_start']} to {evidence_package['collection_metadata']['evidence_period_end']}
- **AWS Account**: {evidence_package['collection_metadata']['aws_account_id']}
- **Collection Method**: Automated
- **Evidence Integrity Hash**: {evidence_package['integrity']['evidence_hash']}

## Control Evidence Summary

"""

        for criteria_id, evidence in evidence_package['control_evidence'].items():
            report += f"""
### {criteria_id}: {evidence['criteria_description']}
- **Compliance Status**: {evidence['compliance_status']}
- **Evidence Items**: {len(evidence['evidence_items'])}
- **Evidence Completeness**: {evidence['metrics']['evidence_completeness']:.1f}%
- **Control Effectiveness**: {evidence['metrics']['control_effectiveness']:.1f}%

"""

            for item in evidence['evidence_items']:
                report += f"""
#### {item['control_id']}: {item['control_description']}
- **Evidence Type**: {item['evidence_type']}
- **Collection Method**: {'Automated' if item['automated'] else 'Manual'}
- **Timestamp**: {item['collection_timestamp']}

"""

        return report

if __name__ == "__main__":
    # Example usage
    collector = SOC2EvidenceCollector()

    # Collect evidence for the last 90 days
    end_date = datetime.now()
    start_date = end_date - timedelta(days=90)

    evidence = collector.collect_comprehensive_evidence(start_date, end_date)

    # Save evidence package
    with open('soc2-evidence-package.json', 'w') as f:
        json.dump(evidence, f, indent=2, default=str)

    # Generate audit report
    report = collector.generate_audit_report(evidence)
    with open('soc2-audit-report.md', 'w') as f:
        f.write(report)

    print("SOC2 evidence collection completed!")
    print(f"Evidence package: soc2-evidence-package.json")
    print(f"Audit report: soc2-audit-report.md")

2. Automated Compliance Dashboard

#!/usr/bin/env python3
# soc2-automation/compliance_dashboard.py

import boto3
import json
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime, timedelta
from typing import Dict, List

class SOC2ComplianceDashboard:
    def __init__(self):
        self.evidence_collector = SOC2EvidenceCollector()

    def create_dashboard(self):
        """Create Streamlit dashboard for SOC2 compliance monitoring"""

        st.set_page_config(
            page_title="SOC2 Type II Compliance Dashboard",
            page_icon="🔒",
            layout="wide"
        )

        st.title("🔒 SOC2 Type II Compliance Dashboard")
        st.markdown("Real-time compliance monitoring and evidence collection")

        # Sidebar controls
        st.sidebar.header("Dashboard Controls")

        # Date range selector
        col1, col2 = st.sidebar.columns(2)
        with col1:
            start_date = st.date_input("Start Date", datetime.now() - timedelta(days=90))
        with col2:
            end_date = st.date_input("End Date", datetime.now())

        # Refresh data button
        if st.sidebar.button("Refresh Evidence"):
            with st.spinner("Collecting evidence..."):
                evidence = self.evidence_collector.collect_comprehensive_evidence(
                    datetime.combine(start_date, datetime.min.time()),
                    datetime.combine(end_date, datetime.min.time())
                )
                st.session_state['evidence'] = evidence

        # Load evidence data
        if 'evidence' not in st.session_state:
            with st.spinner("Loading initial evidence..."):
                evidence = self.evidence_collector.collect_comprehensive_evidence(
                    datetime.combine(start_date, datetime.min.time()),
                    datetime.combine(end_date, datetime.min.time())
                )
                st.session_state['evidence'] = evidence

        evidence = st.session_state['evidence']

        # Main dashboard
        self._create_overview_section(evidence)
        self._create_compliance_metrics(evidence)
        self._create_control_details(evidence)
        self._create_evidence_timeline(evidence)

    def _create_overview_section(self, evidence: Dict):
        """Create overview section with key metrics"""

        st.header("📊 Compliance Overview")

        # Calculate overall metrics
        total_controls = sum(len(ctrl['evidence_items']) for ctrl in evidence['control_evidence'].values())
        compliant_controls = sum(
            len([item for item in ctrl['evidence_items'] if item.get('automated', False)])
            for ctrl in evidence['control_evidence'].values()
        )

        overall_completeness = sum(
            ctrl['metrics']['evidence_completeness']
            for ctrl in evidence['control_evidence'].values()
        ) / len(evidence['control_evidence'])

        overall_effectiveness = sum(
            ctrl['metrics']['control_effectiveness']
            for ctrl in evidence['control_evidence'].values()
        ) / len(evidence['control_evidence'])

        # Display key metrics
        col1, col2, col3, col4 = st.columns(4)

        with col1:
            st.metric(
                label="Overall Compliance",
                value=f"{overall_completeness:.1f}%",
                delta=f"{overall_completeness - 85:.1f}%" if overall_completeness >= 85 else f"{overall_completeness - 85:.1f}%"
            )

        with col2:
            st.metric(
                label="Control Effectiveness",
                value=f"{overall_effectiveness:.1f}%",
                delta=f"{overall_effectiveness - 80:.1f}%" if overall_effectiveness >= 80 else f"{overall_effectiveness - 80:.1f}%"
            )

        with col3:
            st.metric(
                label="Total Controls",
                value=total_controls,
                delta=f"🤖 {compliant_controls} automated"
            )

        with col4:
            st.metric(
                label="Evidence Period",
                value=f"{(datetime.fromisoformat(evidence['collection_metadata']['evidence_period_end']) - datetime.fromisoformat(evidence['collection_metadata']['evidence_period_start'])).days} days",
                delta="Continuous collection"
            )

    def _create_compliance_metrics(self, evidence: Dict):
        """Create compliance metrics visualization"""

        st.header("📈 Trust Service Criteria Compliance")

        # Prepare data for visualization
        criteria_data = []
        for criteria_id, ctrl_evidence in evidence['control_evidence'].items():
            criteria_data.append({
                'Criteria': criteria_id,
                'Description': ctrl_evidence['criteria_description'],
                'Completeness': ctrl_evidence['metrics']['evidence_completeness'],
                'Effectiveness': ctrl_evidence['metrics']['control_effectiveness'],
                'Status': ctrl_evidence['compliance_status'],
                'Controls': len(ctrl_evidence['evidence_items'])
            })

        df = pd.DataFrame(criteria_data)

        # Create compliance heatmap
        fig_heatmap = px.imshow(
            df[['Completeness', 'Effectiveness']].T,
            labels=dict(x="Trust Service Criteria", y="Metrics", color="Score"),
            x=df['Criteria'],
            y=['Evidence Completeness', 'Control Effectiveness'],
            color_continuous_scale='RdYlGn',
            aspect="auto"
        )
        fig_heatmap.update_layout(title="Compliance Heatmap by Trust Service Criteria")
        st.plotly_chart(fig_heatmap, use_container_width=True)

        # Compliance status distribution
        col1, col2 = st.columns(2)

        with col1:
            status_counts = df['Status'].value_counts()
            fig_pie = px.pie(
                values=status_counts.values,
                names=status_counts.index,
                title="Compliance Status Distribution"
            )
            st.plotly_chart(fig_pie, use_container_width=True)

        with col2:
            fig_bar = px.bar(
                df,
                x='Criteria',
                y=['Completeness', 'Effectiveness'],
                title="Compliance Scores by Criteria",
                barmode='group'
            )
            st.plotly_chart(fig_bar, use_container_width=True)

    def _create_control_details(self, evidence: Dict):
        """Create detailed control information"""

        st.header("🔍 Control Details")

        # Allow users to select criteria
        selected_criteria = st.selectbox(
            "Select Trust Service Criteria",
            options=list(evidence['control_evidence'].keys()),
            format_func=lambda x: f"{x}: {evidence['control_evidence'][x]['criteria_description']}"
        )

        if selected_criteria:
            ctrl_evidence = evidence['control_evidence'][selected_criteria]

            # Display criteria information
            col1, col2 = st.columns(2)

            with col1:
                st.subheader(f"{selected_criteria}: {ctrl_evidence['criteria_description']}")
                st.write(f"**Compliance Status:** {ctrl_evidence['compliance_status']}")
                st.write(f"**Evidence Items:** {len(ctrl_evidence['evidence_items'])}")
                st.write(f"**Evidence Completeness:** {ctrl_evidence['metrics']['evidence_completeness']:.1f}%")
                st.write(f"**Control Effectiveness:** {ctrl_evidence['metrics']['control_effectiveness']:.1f}%")

            with col2:
                # Control effectiveness gauge
                fig_gauge = go.Figure(go.Indicator(
                    mode = "gauge+number",
                    value = ctrl_evidence['metrics']['control_effectiveness'],
                    domain = {'x': [0, 1], 'y': [0, 1]},
                    title = {'text': "Control Effectiveness"},
                    gauge = {
                        'axis': {'range': [None, 100]},
                        'bar': {'color': "darkblue"},
                        'steps': [
                            {'range': [0, 50], 'color': "lightgray"},
                            {'range': [50, 80], 'color': "yellow"},
                            {'range': [80, 100], 'color': "green"}
                        ],
                        'threshold': {
                            'line': {'color': "red", 'width': 4},
                            'thickness': 0.75,
                            'value': 90
                        }
                    }
                ))
                st.plotly_chart(fig_gauge, use_container_width=True)

            # Display evidence items
            st.subheader("Evidence Items")

            evidence_df = pd.DataFrame([
                {
                    'Control ID': item['control_id'],
                    'Description': item['control_description'],
                    'Evidence Type': item['evidence_type'],
                    'Automated': '✅' if item['automated'] else '❌',
                    'Collection Time': item['collection_timestamp']
                }
                for item in ctrl_evidence['evidence_items']
            ])

            st.dataframe(evidence_df, use_container_width=True)

            # Show evidence data for selected control
            selected_control = st.selectbox(
                "View evidence data for control:",
                options=[item['control_id'] for item in ctrl_evidence['evidence_items']]
            )

            if selected_control:
                control_item = next(
                    item for item in ctrl_evidence['evidence_items']
                    if item['control_id'] == selected_control
                )

                with st.expander(f"Evidence Data for {selected_control}"):
                    st.json(control_item['evidence_data'])

    def _create_evidence_timeline(self, evidence: Dict):
        """Create evidence collection timeline"""

        st.header("📅 Evidence Collection Timeline")

        # Prepare timeline data
        timeline_data = []
        for criteria_id, ctrl_evidence in evidence['control_evidence'].items():
            for item in ctrl_evidence['evidence_items']:
                timeline_data.append({
                    'timestamp': datetime.fromisoformat(item['collection_timestamp']),
                    'criteria': criteria_id,
                    'control_id': item['control_id'],
                    'evidence_type': item['evidence_type'],
                    'automated': item['automated']
                })

        if timeline_data:
            df_timeline = pd.DataFrame(timeline_data)

            # Create timeline chart
            fig_timeline = px.scatter(
                df_timeline,
                x='timestamp',
                y='criteria',
                color='evidence_type',
                symbol='automated',
                title="Evidence Collection Timeline",
                hover_data=['control_id', 'evidence_type']
            )
            fig_timeline.update_layout(height=400)
            st.plotly_chart(fig_timeline, use_container_width=True)

        # Evidence collection summary
        st.subheader("Collection Summary")
        col1, col2 = st.columns(2)

        with col1:
            st.write(f"**Collection Date:** {evidence['collection_metadata']['collection_date']}")
            st.write(f"**AWS Account:** {evidence['collection_metadata']['aws_account_id']}")
            st.write(f"**Collector Version:** {evidence['collection_metadata']['collector_version']}")

        with col2:
            st.write(f"**Evidence Hash:** {evidence['integrity']['evidence_hash'][:16]}...")
            st.write(f"**Evidence Size:** {evidence['integrity']['evidence_size_bytes']:,} bytes")
            st.write(f"**Integrity Algorithm:** {evidence['integrity']['integrity_algorithm']}")

if __name__ == "__main__":
    dashboard = SOC2ComplianceDashboard()
    dashboard.create_dashboard()

Continuous Compliance Automation

CI/CD Integration for SOC2 Compliance

1. GitHub Actions SOC2 Compliance Workflow

# .github/workflows/soc2-compliance.yml
name: SOC2 Compliance Validation

on:
  schedule:
    - cron: '0 0 * * *' # Daily compliance check
  push:
    branches: [main]
    paths: ['infrastructure/**', 'terraform/**']
  workflow_dispatch:
    inputs:
      evidence_period_days:
        description: 'Evidence collection period in days'
        required: false
        default: '30'

env:
  AWS_DEFAULT_REGION: us-east-1
  TERRAFORM_VERSION: '1.6.6'

jobs:
  infrastructure-compliance:
    name: Infrastructure Compliance Validation
    runs-on: ubuntu-latest
    permissions:
      id-token: write
      contents: read
      security-events: write

    steps:
      - name: Checkout Code
        uses: actions/checkout@v4

      - name: Configure AWS Credentials
        uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: ${{ secrets.AWS_OIDC_ROLE }}
          aws-region: ${{ env.AWS_DEFAULT_REGION }}

      - name: Setup Terraform
        uses: hashicorp/setup-terraform@v3
        with:
          terraform_version: ${{ env.TERRAFORM_VERSION }}

      - name: Validate SOC2 Terraform Configurations
        run: |
          # Validate all Terraform configurations for SOC2 compliance
          find infrastructure/ -name "*.tf" -type f | while read tf_file; do
            echo "Validating SOC2 compliance for: $tf_file"
            
            # Check for required SOC2 tags
            if ! grep -q "SOC2:Control" "$tf_file"; then
              echo "❌ Missing SOC2:Control tag in $tf_file"
              exit 1
            fi
            
            # Check for encryption configurations
            if grep -q "aws_s3_bucket\|aws_ebs_volume\|aws_rds" "$tf_file"; then
              if ! grep -q "kms_key_id\|encryption" "$tf_file"; then
                echo "⚠️ Potential encryption issue in $tf_file"
              fi
            fi
            
            # Check for logging configurations
            if grep -q "aws_vpc\|aws_lb" "$tf_file"; then
              if ! grep -q "flow_log\|access_logs" "$tf_file"; then
                echo "⚠️ Potential logging issue in $tf_file"
              fi
            fi
          done

      - name: Terraform Plan with SOC2 Validation
        run: |
          cd infrastructure/
          terraform init
          terraform plan -out=tfplan.binary
          terraform show -json tfplan.binary > tfplan.json

      - name: SOC2 Compliance Scanning
        run: |
          # Install compliance scanning tools
          pip install boto3 pandas streamlit plotly

          # Run SOC2 evidence collection
          python3 << 'EOF'
          import sys
          import os
          sys.path.append('scripts/')

          from soc2_evidence_collector import SOC2EvidenceCollector
          from datetime import datetime, timedelta

          # Collect evidence
          collector = SOC2EvidenceCollector()

          evidence_days = int(os.getenv('EVIDENCE_PERIOD_DAYS', '30'))
          end_date = datetime.now()
          start_date = end_date - timedelta(days=evidence_days)

          evidence = collector.collect_comprehensive_evidence(start_date, end_date)

          # Generate compliance report
          report = collector.generate_audit_report(evidence)

          # Save artifacts
          import json
          with open('soc2-evidence.json', 'w') as f:
              json.dump(evidence, f, indent=2, default=str)

          with open('soc2-compliance-report.md', 'w') as f:
              f.write(report)

          # Check compliance thresholds
          overall_completeness = sum(
              ctrl['metrics']['evidence_completeness'] 
              for ctrl in evidence['control_evidence'].values()
          ) / len(evidence['control_evidence'])

          print(f"Overall compliance: {overall_completeness:.1f}%")

          if overall_completeness < 85:
              print("❌ Compliance below threshold (85%)")
              sys.exit(1)
          else:
              print("✅ Compliance meets threshold")
          EOF
        env:
          EVIDENCE_PERIOD_DAYS: ${{ github.event.inputs.evidence_period_days || '30' }}

      - name: Upload SOC2 Evidence Artifacts
        uses: actions/upload-artifact@v3
        with:
          name: soc2-compliance-evidence
          path: |
            soc2-evidence.json
            soc2-compliance-report.md
            tfplan.json
          retention-days: 2557 # 7 years for SOC2

      - name: Create Compliance Issue
        if: failure()
        uses: actions/github-script@v6
        with:
          script: |
            github.rest.issues.create({
              owner: context.repo.owner,
              repo: context.repo.repo,
              title: `SOC2 Compliance Failure - ${new Date().toISOString()}`,
              body: `## SOC2 Compliance Check Failed
              
              **Workflow Run:** ${{ github.run_id }}
              **Branch:** ${{ github.ref }}
              **Commit:** ${{ github.sha }}
              
              Please review the compliance report and address any issues.`,
              labels: ['compliance', 'soc2', 'urgent']
            });

  evidence-archive:
    name: Archive Evidence for Audit
    runs-on: ubuntu-latest
    needs: infrastructure-compliance
    if: github.ref == 'refs/heads/main'

    steps:
      - name: Download Evidence Artifacts
        uses: actions/download-artifact@v3
        with:
          name: soc2-compliance-evidence

      - name: Configure AWS Credentials
        uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: ${{ secrets.AWS_OIDC_ROLE }}
          aws-region: ${{ env.AWS_DEFAULT_REGION }}

      - name: Archive Evidence to S3
        run: |
          # Create timestamped archive
          TIMESTAMP=$(date +"%Y-%m-%d_%H-%M-%S")
          ARCHIVE_PREFIX="soc2-evidence/${TIMESTAMP}"

          # Upload evidence with proper metadata
          aws s3 cp soc2-evidence.json s3://${{ secrets.SOC2_EVIDENCE_BUCKET }}/${ARCHIVE_PREFIX}/evidence.json \
            --metadata "compliance-framework=SOC2-Type-II,collection-date=${TIMESTAMP},retention-years=7"

          aws s3 cp soc2-compliance-report.md s3://${{ secrets.SOC2_EVIDENCE_BUCKET }}/${ARCHIVE_PREFIX}/report.md \
            --metadata "compliance-framework=SOC2-Type-II,collection-date=${TIMESTAMP},retention-years=7"

          # Create evidence integrity hash
          sha256sum soc2-evidence.json > evidence-integrity.sha256
          aws s3 cp evidence-integrity.sha256 s3://${{ secrets.SOC2_EVIDENCE_BUCKET }}/${ARCHIVE_PREFIX}/integrity.sha256

          echo "Evidence archived to: s3://${{ secrets.SOC2_EVIDENCE_BUCKET }}/${ARCHIVE_PREFIX}/"

Business Impact and ROI

SOC2 Automation ROI Analysis

Implementation Costs vs. Benefits:

CategoryManual SOC2Automated SOC2Savings
Annual Audit Preparation2,000 hours400 hours$240K
Evidence Collection800 hours80 hours$108K
Compliance Monitoring1,200 hours200 hours$150K
Audit Duration8 weeks3 weeks$75K
Documentation Effort600 hours100 hours$75K
Total Annual Savings--$648K

ROI Calculation:

# Annual SOC2 automation value
AUDIT_PREPARATION_SAVINGS = 240000      # Reduced preparation time
EVIDENCE_COLLECTION_SAVINGS = 108000    # Automated evidence gathering
CONTINUOUS_MONITORING_SAVINGS = 150000  # Real-time compliance
AUDIT_DURATION_SAVINGS = 75000          # Faster audit completion
DOCUMENTATION_SAVINGS = 75000           # Automated reporting

TOTAL_ANNUAL_SAVINGS = AUDIT_PREPARATION_SAVINGS + EVIDENCE_COLLECTION_SAVINGS +
                      CONTINUOUS_MONITORING_SAVINGS + AUDIT_DURATION_SAVINGS +
                      DOCUMENTATION_SAVINGS
# Total Savings: $648,000 annually

IMPLEMENTATION_COST = 150000  # Initial automation setup
ANNUAL_MAINTENANCE = 30000    # Ongoing maintenance

FIRST_YEAR_ROI = ((TOTAL_ANNUAL_SAVINGS - IMPLEMENTATION_COST - ANNUAL_MAINTENANCE) /
                  (IMPLEMENTATION_COST + ANNUAL_MAINTENANCE)) * 100
# ROI: 260% in first year

ONGOING_ROI = ((TOTAL_ANNUAL_SAVINGS - ANNUAL_MAINTENANCE) / ANNUAL_MAINTENANCE) * 100
# Ongoing ROI: 2,060% annually

Conclusion

SOC2 Type II automation transforms compliance from a burdensome annual process into a continuous, value-adding business capability. By implementing Infrastructure as Code with embedded security controls, automated evidence collection, and continuous compliance monitoring, organizations achieve both regulatory compliance and improved security posture.

The key to successful SOC2 automation lies in building compliance into your development and operations workflows from the beginning, rather than treating it as an after-the-fact audit exercise. This approach reduces compliance costs while improving actual security outcomes.

Remember that SOC2 automation is not about checking boxes - it’s about building systems that provide continuous assurance about your security controls and business processes.

Your SOC2 automation journey starts with implementing infrastructure as code with compliance tagging. Begin today and build towards continuous compliance validation.