SOC2 Type II Automation: Evidence Collection with Infrastructure as Code

The SOC2 Type II Automation Challenge
Your organization undergoes SOC2 Type II audits annually, requiring months of manual evidence collection, policy documentation, and control validation across hundreds of systems and processes. Auditors request evidence spanning 12 months of operations, forcing your team to retroactively gather logs, screenshots, and documentation that may be incomplete or inconsistent. This manual approach creates audit anxiety, consumes significant resources, and provides limited assurance about actual security posture.
SOC2 Type II automation transforms compliance from a periodic scramble into continuous validation, providing real-time evidence collection, automated policy enforcement, and comprehensive audit trails that reduce audit time from months to weeks while improving actual security posture.
SOC2 Type II Framework for DevSecOps
SOC2 Type II evaluates the effectiveness of controls over time, requiring evidence of consistent implementation across all Trust Service Criteria: Security, Availability, Processing Integrity, Confidentiality, and Privacy. Modern cloud-native environments enable automated evidence collection and continuous compliance validation.
Core SOC2 Type II Automation Components
1. Continuous Evidence Collection
- Automated log aggregation and retention across all systems
- Real-time policy compliance monitoring and validation
- Configuration management and drift detection
- Identity and access management audit trails
2. Automated Control Implementation
- Infrastructure as Code with embedded security controls
- Policy as Code for automated governance enforcement
- Continuous security monitoring and alerting
- Automated remediation and response workflows
3. Audit Trail Generation
- Immutable audit logs with cryptographic verification
- Automated compliance reporting and dashboards
- Evidence packaging and audit support automation
- Historical compliance analysis and trending
Infrastructure as Code for SOC2 Compliance
Infrastructure as Code provides the foundation for consistent, auditable, and repeatable security control implementation across all environments.
SOC2-Compliant Infrastructure Templates
1. Security-Focused Terraform Modules
# terraform/modules/soc2-compliant-vpc/main.tf
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
}
# SOC2 Security Control: Network Segmentation (CC6.1)
resource "aws_vpc" "soc2_vpc" {
cidr_block = var.vpc_cidr
enable_dns_hostnames = true
enable_dns_support = true
tags = merge(var.common_tags, {
Name = "${var.environment}-soc2-vpc"
"SOC2:Control" = "CC6.1"
"SOC2:Description" = "Network segmentation and access controls"
"Compliance:Framework" = "SOC2-Type-II"
"Audit:Required" = "true"
})
}
# SOC2 Security Control: VPC Flow Logs (CC7.2)
resource "aws_flow_log" "vpc_flow_log" {
iam_role_arn = aws_iam_role.flow_log_role.arn
log_destination = aws_cloudwatch_log_group.vpc_flow_log.arn
traffic_type = "ALL"
vpc_id = aws_vpc.soc2_vpc.id
tags = merge(var.common_tags, {
Name = "${var.environment}-vpc-flow-logs"
"SOC2:Control" = "CC7.2"
"SOC2:Description" = "Network monitoring and logging"
"Audit:RetentionDays" = "2557" # 7 years for SOC2
})
}
# SOC2 Security Control: Encrypted CloudWatch Logs (CC6.7)
resource "aws_cloudwatch_log_group" "vpc_flow_log" {
name = "/aws/vpc/flowlogs/${var.environment}"
retention_in_days = 2557 # 7 years retention for SOC2
kms_key_id = aws_kms_key.soc2_logging_key.arn
tags = merge(var.common_tags, {
"SOC2:Control" = "CC6.7"
"SOC2:Description" = "Encrypted audit logging"
"Audit:Critical" = "true"
})
}
# SOC2 Security Control: Encryption Key Management (CC6.7)
resource "aws_kms_key" "soc2_logging_key" {
description = "SOC2 compliance logging encryption key"
deletion_window_in_days = 30
enable_key_rotation = true
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "Enable IAM User Permissions"
Effect = "Allow"
Principal = {
AWS = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:root"
}
Action = "kms:*"
Resource = "*"
},
{
Sid = "Allow CloudWatch Logs"
Effect = "Allow"
Principal = {
Service = "logs.${data.aws_region.current.name}.amazonaws.com"
}
Action = [
"kms:Encrypt",
"kms:Decrypt",
"kms:ReEncrypt*",
"kms:GenerateDataKey*",
"kms:DescribeKey"
]
Resource = "*"
Condition = {
ArnEquals = {
"kms:EncryptionContext:aws:logs:arn" = "arn:aws:logs:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:log-group:/aws/vpc/flowlogs/${var.environment}"
}
}
}
]
})
tags = merge(var.common_tags, {
Name = "${var.environment}-soc2-logging-key"
"SOC2:Control" = "CC6.7"
"SOC2:Description" = "Encryption key for compliance logging"
"Audit:KeyRotation" = "enabled"
})
}
resource "aws_kms_alias" "soc2_logging_key_alias" {
name = "alias/${var.environment}-soc2-logging"
target_key_id = aws_kms_key.soc2_logging_key.key_id
}
# SOC2 Security Control: Private Subnets (CC6.1)
resource "aws_subnet" "private_subnets" {
count = length(var.private_subnet_cidrs)
vpc_id = aws_vpc.soc2_vpc.id
cidr_block = var.private_subnet_cidrs[count.index]
availability_zone = data.aws_availability_zones.available.names[count.index]
map_public_ip_on_launch = false
tags = merge(var.common_tags, {
Name = "${var.environment}-private-subnet-${count.index + 1}"
Type = "Private"
"SOC2:Control" = "CC6.1"
"SOC2:Description" = "Isolated private network segment"
"kubernetes.io/role/internal-elb" = "1"
})
}
# SOC2 Security Control: Public Subnets with NACLs (CC6.1)
resource "aws_subnet" "public_subnets" {
count = length(var.public_subnet_cidrs)
vpc_id = aws_vpc.soc2_vpc.id
cidr_block = var.public_subnet_cidrs[count.index]
availability_zone = data.aws_availability_zones.available.names[count.index]
map_public_ip_on_launch = true
tags = merge(var.common_tags, {
Name = "${var.environment}-public-subnet-${count.index + 1}"
Type = "Public"
"SOC2:Control" = "CC6.1"
"SOC2:Description" = "Controlled public network access"
"kubernetes.io/role/elb" = "1"
})
}
# SOC2 Security Control: Network ACLs (CC6.1)
resource "aws_network_acl" "private_nacl" {
vpc_id = aws_vpc.soc2_vpc.id
subnet_ids = aws_subnet.private_subnets[*].id
# Allow inbound HTTPS from public subnets
ingress {
protocol = "tcp"
rule_no = 100
action = "allow"
cidr_block = var.vpc_cidr
from_port = 443
to_port = 443
}
# Allow inbound HTTP from public subnets (for health checks)
ingress {
protocol = "tcp"
rule_no = 110
action = "allow"
cidr_block = var.vpc_cidr
from_port = 80
to_port = 80
}
# Allow ephemeral ports for responses
ingress {
protocol = "tcp"
rule_no = 120
action = "allow"
cidr_block = "0.0.0.0/0"
from_port = 1024
to_port = 65535
}
# Allow all outbound traffic
egress {
protocol = "-1"
rule_no = 100
action = "allow"
cidr_block = "0.0.0.0/0"
from_port = 0
to_port = 0
}
tags = merge(var.common_tags, {
Name = "${var.environment}-private-nacl"
"SOC2:Control" = "CC6.1"
"SOC2:Description" = "Network access control list for private subnets"
})
}
# SOC2 Security Control: IAM Role for Flow Logs (CC6.2)
resource "aws_iam_role" "flow_log_role" {
name = "${var.environment}-vpc-flow-log-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "vpc-flow-logs.amazonaws.com"
}
}
]
})
tags = merge(var.common_tags, {
"SOC2:Control" = "CC6.2"
"SOC2:Description" = "IAM role for VPC flow logs service"
})
}
resource "aws_iam_role_policy" "flow_log_policy" {
name = "${var.environment}-vpc-flow-log-policy"
role = aws_iam_role.flow_log_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams"
]
Effect = "Allow"
Resource = aws_cloudwatch_log_group.vpc_flow_log.arn
}
]
})
}
# Data sources
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}
data "aws_availability_zones" "available" {
state = "available"
}
# Variables
variable "environment" {
description = "Environment name (e.g., production, staging)"
type = string
}
variable "vpc_cidr" {
description = "CIDR block for VPC"
type = string
default = "10.0.0.0/16"
}
variable "private_subnet_cidrs" {
description = "CIDR blocks for private subnets"
type = list(string)
default = ["10.0.1.0/24", "10.0.2.0/24", "10.0.3.0/24"]
}
variable "public_subnet_cidrs" {
description = "CIDR blocks for public subnets"
type = list(string)
default = ["10.0.101.0/24", "10.0.102.0/24", "10.0.103.0/24"]
}
variable "common_tags" {
description = "Common tags for all resources"
type = map(string)
default = {
"Terraform" = "true"
"Compliance:Framework" = "SOC2-Type-II"
"Environment" = "production"
}
}
# Outputs
output "vpc_id" {
description = "ID of the VPC"
value = aws_vpc.soc2_vpc.id
}
output "private_subnet_ids" {
description = "IDs of the private subnets"
value = aws_subnet.private_subnets[*].id
}
output "public_subnet_ids" {
description = "IDs of the public subnets"
value = aws_subnet.public_subnets[*].id
}
output "flow_log_group_name" {
description = "Name of the VPC flow log CloudWatch log group"
value = aws_cloudwatch_log_group.vpc_flow_log.name
}
output "soc2_compliance_tags" {
description = "SOC2 compliance tags applied to resources"
value = {
for key, value in var.common_tags : key => value
if can(regex("^(SOC2|Compliance|Audit):", key))
}
}
2. SOC2 Application Security Module
# terraform/modules/soc2-application/main.tf
# SOC2 Security Control: ECS Cluster with Security Configuration (CC6.1)
resource "aws_ecs_cluster" "soc2_cluster" {
name = "${var.application_name}-${var.environment}"
setting {
name = "containerInsights"
value = "enabled"
}
configuration {
execute_command_configuration {
kms_key_id = aws_kms_key.ecs_exec_key.arn
logging = "OVERRIDE"
log_configuration {
cloud_watch_encryption_enabled = true
cloud_watch_log_group_name = aws_cloudwatch_log_group.ecs_exec_logs.name
}
}
}
tags = merge(var.common_tags, {
Name = "${var.application_name}-${var.environment}-cluster"
"SOC2:Control" = "CC6.1"
"SOC2:Description" = "Secure container orchestration platform"
"Audit:ContainerInsights" = "enabled"
})
}
# SOC2 Security Control: Application Load Balancer with SSL (CC6.7)
resource "aws_lb" "application_lb" {
name = "${var.application_name}-${var.environment}-alb"
internal = false
load_balancer_type = "application"
security_groups = [aws_security_group.alb_sg.id]
subnets = var.public_subnet_ids
enable_deletion_protection = var.environment == "production" ? true : false
enable_http2 = true
access_logs {
bucket = aws_s3_bucket.alb_logs.bucket
prefix = "alb-logs"
enabled = true
}
tags = merge(var.common_tags, {
Name = "${var.application_name}-${var.environment}-alb"
"SOC2:Control" = "CC6.7"
"SOC2:Description" = "Encrypted application load balancer"
"Audit:AccessLogs" = "enabled"
})
}
# SOC2 Security Control: S3 Bucket for ALB Logs (CC7.2)
resource "aws_s3_bucket" "alb_logs" {
bucket = "${var.application_name}-${var.environment}-alb-logs-${random_string.bucket_suffix.result}"
force_destroy = var.environment != "production"
tags = merge(var.common_tags, {
Name = "${var.application_name}-${var.environment}-alb-logs"
"SOC2:Control" = "CC7.2"
"SOC2:Description" = "Access logs for application load balancer"
"Audit:RetentionYears" = "7"
})
}
resource "aws_s3_bucket_versioning" "alb_logs" {
bucket = aws_s3_bucket.alb_logs.id
versioning_configuration {
status = "Enabled"
}
}
resource "aws_s3_bucket_encryption" "alb_logs" {
bucket = aws_s3_bucket.alb_logs.id
server_side_encryption_configuration {
rule {
apply_server_side_encryption_by_default {
kms_master_key_id = aws_kms_key.s3_key.arn
sse_algorithm = "aws:kms"
}
}
}
}
resource "aws_s3_bucket_lifecycle_configuration" "alb_logs" {
bucket = aws_s3_bucket.alb_logs.id
rule {
id = "soc2_compliance_retention"
status = "Enabled"
transition {
days = 30
storage_class = "STANDARD_IA"
}
transition {
days = 90
storage_class = "GLACIER"
}
transition {
days = 365
storage_class = "DEEP_ARCHIVE"
}
expiration {
days = 2557 # 7 years for SOC2 compliance
}
}
}
# SOC2 Security Control: ECS Task Definition with Security Context (CC6.1)
resource "aws_ecs_task_definition" "app_task" {
family = "${var.application_name}-${var.environment}"
network_mode = "awsvpc"
requires_compatibilities = ["FARGATE"]
cpu = var.task_cpu
memory = var.task_memory
execution_role_arn = aws_iam_role.ecs_execution_role.arn
task_role_arn = aws_iam_role.ecs_task_role.arn
container_definitions = jsonencode([
{
name = var.application_name
image = var.container_image
essential = true
portMappings = [
{
containerPort = var.container_port
protocol = "tcp"
}
]
# SOC2 Security Control: Application Logging (CC7.2)
logConfiguration = {
logDriver = "awslogs"
options = {
awslogs-group = aws_cloudwatch_log_group.app_logs.name
awslogs-region = data.aws_region.current.name
awslogs-stream-prefix = "ecs"
}
}
# SOC2 Security Control: Environment Variable Security
environment = [
{
name = "ENVIRONMENT"
value = var.environment
},
{
name = "LOG_LEVEL"
value = var.environment == "production" ? "INFO" : "DEBUG"
}
]
# SOC2 Security Control: Secrets Management (CC6.7)
secrets = [
{
name = "DATABASE_PASSWORD"
valueFrom = aws_secretsmanager_secret.app_secrets.arn
}
]
# SOC2 Security Control: Container Security
readonlyRootFilesystem = true
user = "1001" # Non-root user
healthCheck = {
command = ["CMD-SHELL", "curl -f http://localhost:${var.container_port}/health || exit 1"]
interval = 30
timeout = 5
retries = 3
startPeriod = 60
}
}
])
tags = merge(var.common_tags, {
Name = "${var.application_name}-${var.environment}-task"
"SOC2:Control" = "CC6.1"
"SOC2:Description" = "Secure container task definition"
"Audit:ReadOnlyRoot" = "true"
"Audit:NonRootUser" = "true"
})
}
# SOC2 Security Control: CloudWatch Log Groups with Encryption (CC6.7)
resource "aws_cloudwatch_log_group" "app_logs" {
name = "/ecs/${var.application_name}-${var.environment}"
retention_in_days = 2557 # 7 years for SOC2
kms_key_id = aws_kms_key.cloudwatch_key.arn
tags = merge(var.common_tags, {
Name = "${var.application_name}-${var.environment}-logs"
"SOC2:Control" = "CC6.7"
"SOC2:Description" = "Encrypted application logs"
"Audit:RetentionDays" = "2557"
})
}
resource "aws_cloudwatch_log_group" "ecs_exec_logs" {
name = "/ecs/exec/${var.application_name}-${var.environment}"
retention_in_days = 90 # 90 days for executive access logs
kms_key_id = aws_kms_key.ecs_exec_key.arn
tags = merge(var.common_tags, {
Name = "${var.application_name}-${var.environment}-exec-logs"
"SOC2:Control" = "CC6.2"
"SOC2:Description" = "Container executive access logs"
"Audit:AccessType" = "administrative"
})
}
# SOC2 Security Control: KMS Keys for Encryption (CC6.7)
resource "aws_kms_key" "cloudwatch_key" {
description = "KMS key for CloudWatch logs encryption"
deletion_window_in_days = 30
enable_key_rotation = true
tags = merge(var.common_tags, {
Name = "${var.application_name}-${var.environment}-cloudwatch-key"
"SOC2:Control" = "CC6.7"
"SOC2:Description" = "Encryption key for CloudWatch logs"
"Audit:KeyRotation" = "enabled"
})
}
resource "aws_kms_key" "ecs_exec_key" {
description = "KMS key for ECS Exec encryption"
deletion_window_in_days = 30
enable_key_rotation = true
tags = merge(var.common_tags, {
Name = "${var.application_name}-${var.environment}-ecs-exec-key"
"SOC2:Control" = "CC6.7"
"SOC2:Description" = "Encryption key for ECS Exec sessions"
})
}
resource "aws_kms_key" "s3_key" {
description = "KMS key for S3 encryption"
deletion_window_in_days = 30
enable_key_rotation = true
tags = merge(var.common_tags, {
Name = "${var.application_name}-${var.environment}-s3-key"
"SOC2:Control" = "CC6.7"
"SOC2:Description" = "Encryption key for S3 buckets"
})
}
# SOC2 Security Control: Secrets Manager (CC6.7)
resource "aws_secretsmanager_secret" "app_secrets" {
name = "${var.application_name}-${var.environment}-secrets"
description = "Application secrets for ${var.application_name}"
kms_key_id = aws_kms_key.secrets_key.arn
recovery_window_in_days = var.environment == "production" ? 30 : 0
tags = merge(var.common_tags, {
Name = "${var.application_name}-${var.environment}-secrets"
"SOC2:Control" = "CC6.7"
"SOC2:Description" = "Encrypted secrets management"
"Audit:SecretRotation" = "enabled"
})
}
resource "aws_kms_key" "secrets_key" {
description = "KMS key for Secrets Manager"
deletion_window_in_days = 30
enable_key_rotation = true
tags = merge(var.common_tags, {
Name = "${var.application_name}-${var.environment}-secrets-key"
"SOC2:Control" = "CC6.7"
"SOC2:Description" = "Encryption key for secrets"
})
}
# Random string for unique bucket naming
resource "random_string" "bucket_suffix" {
length = 8
special = false
upper = false
}
# Data sources
data "aws_region" "current" {}
data "aws_caller_identity" "current" {}
Automated SOC2 Evidence Collection
Continuous Compliance Monitoring System
1. SOC2 Evidence Collection Automation
#!/usr/bin/env python3
# soc2-automation/evidence_collector.py
import boto3
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import pandas as pd
from pathlib import Path
import hashlib
class SOC2EvidenceCollector:
def __init__(self, aws_profile: str = None):
self.session = boto3.Session(profile_name=aws_profile)
self.logger = logging.getLogger(__name__)
# Initialize AWS clients
self.cloudtrail = self.session.client('cloudtrail')
self.cloudwatch = self.session.client('cloudwatch')
self.logs = self.session.client('logs')
self.config = self.session.client('config')
self.iam = self.session.client('iam')
self.s3 = self.session.client('s3')
# SOC2 Trust Service Criteria mapping
self.trust_criteria = {
'CC1': 'Control Environment',
'CC2': 'Communication and Information',
'CC3': 'Risk Assessment',
'CC4': 'Monitoring Activities',
'CC5': 'Control Activities',
'CC6': 'Logical and Physical Access Controls',
'CC7': 'System Operations',
'CC8': 'Change Management',
'CC9': 'Risk Mitigation'
}
def collect_comprehensive_evidence(self, start_date: datetime,
end_date: datetime) -> Dict:
"""Collect comprehensive SOC2 Type II evidence"""
evidence_package = {
'collection_metadata': {
'collection_date': datetime.now().isoformat(),
'evidence_period_start': start_date.isoformat(),
'evidence_period_end': end_date.isoformat(),
'collector_version': '2.0',
'aws_account_id': self.session.client('sts').get_caller_identity()['Account']
},
'control_evidence': {}
}
# Collect evidence for each Trust Service Criteria
for criteria_id, description in self.trust_criteria.items():
self.logger.info(f"Collecting evidence for {criteria_id}: {description}")
evidence_package['control_evidence'][criteria_id] = \
self._collect_criteria_evidence(criteria_id, start_date, end_date)
# Generate evidence integrity hashes
evidence_package['integrity'] = self._generate_evidence_integrity(evidence_package)
return evidence_package
def _collect_criteria_evidence(self, criteria_id: str,
start_date: datetime, end_date: datetime) -> Dict:
"""Collect evidence for specific Trust Service Criteria"""
evidence = {
'criteria_id': criteria_id,
'criteria_description': self.trust_criteria[criteria_id],
'evidence_items': [],
'metrics': {},
'compliance_status': 'PENDING'
}
if criteria_id == 'CC1': # Control Environment
evidence['evidence_items'].extend(self._collect_cc1_evidence(start_date, end_date))
elif criteria_id == 'CC2': # Communication and Information
evidence['evidence_items'].extend(self._collect_cc2_evidence(start_date, end_date))
elif criteria_id == 'CC6': # Logical and Physical Access Controls
evidence['evidence_items'].extend(self._collect_cc6_evidence(start_date, end_date))
elif criteria_id == 'CC7': # System Operations
evidence['evidence_items'].extend(self._collect_cc7_evidence(start_date, end_date))
elif criteria_id == 'CC8': # Change Management
evidence['evidence_items'].extend(self._collect_cc8_evidence(start_date, end_date))
# Calculate compliance metrics
evidence['metrics'] = self._calculate_compliance_metrics(evidence['evidence_items'])
evidence['compliance_status'] = self._determine_compliance_status(evidence['metrics'])
return evidence
def _collect_cc1_evidence(self, start_date: datetime, end_date: datetime) -> List[Dict]:
"""Collect CC1 (Control Environment) evidence"""
evidence_items = []
# CC1.1: Management establishes structures, reporting lines, and authorities
org_policies = self._get_iam_policies_evidence()
evidence_items.append({
'control_id': 'CC1.1',
'control_description': 'Organizational structure and authority',
'evidence_type': 'iam_policies',
'evidence_data': org_policies,
'collection_timestamp': datetime.now().isoformat(),
'automated': True
})
# CC1.2: Board of directors and management establish oversight responsibilities
governance_evidence = self._get_governance_evidence(start_date, end_date)
evidence_items.append({
'control_id': 'CC1.2',
'control_description': 'Governance and oversight',
'evidence_type': 'governance_activities',
'evidence_data': governance_evidence,
'collection_timestamp': datetime.now().isoformat(),
'automated': True
})
return evidence_items
def _collect_cc6_evidence(self, start_date: datetime, end_date: datetime) -> List[Dict]:
"""Collect CC6 (Logical and Physical Access Controls) evidence"""
evidence_items = []
# CC6.1: Access controls restrict unauthorized access
access_controls = self._get_access_control_evidence(start_date, end_date)
evidence_items.append({
'control_id': 'CC6.1',
'control_description': 'Logical access controls',
'evidence_type': 'access_controls',
'evidence_data': access_controls,
'collection_timestamp': datetime.now().isoformat(),
'automated': True
})
# CC6.2: Privileged access is restricted and monitored
privileged_access = self._get_privileged_access_evidence(start_date, end_date)
evidence_items.append({
'control_id': 'CC6.2',
'control_description': 'Privileged access monitoring',
'evidence_type': 'privileged_access_logs',
'evidence_data': privileged_access,
'collection_timestamp': datetime.now().isoformat(),
'automated': True
})
# CC6.7: Data transmission and disposal controls
encryption_evidence = self._get_encryption_evidence()
evidence_items.append({
'control_id': 'CC6.7',
'control_description': 'Data transmission and encryption',
'evidence_type': 'encryption_configuration',
'evidence_data': encryption_evidence,
'collection_timestamp': datetime.now().isoformat(),
'automated': True
})
return evidence_items
def _collect_cc7_evidence(self, start_date: datetime, end_date: datetime) -> List[Dict]:
"""Collect CC7 (System Operations) evidence"""
evidence_items = []
# CC7.1: System capacity and performance monitoring
performance_monitoring = self._get_performance_monitoring_evidence(start_date, end_date)
evidence_items.append({
'control_id': 'CC7.1',
'control_description': 'Performance and capacity monitoring',
'evidence_type': 'performance_metrics',
'evidence_data': performance_monitoring,
'collection_timestamp': datetime.now().isoformat(),
'automated': True
})
# CC7.2: System monitoring for security events
security_monitoring = self._get_security_monitoring_evidence(start_date, end_date)
evidence_items.append({
'control_id': 'CC7.2',
'control_description': 'Security event monitoring',
'evidence_type': 'security_logs',
'evidence_data': security_monitoring,
'collection_timestamp': datetime.now().isoformat(),
'automated': True
})
return evidence_items
def _collect_cc8_evidence(self, start_date: datetime, end_date: datetime) -> List[Dict]:
"""Collect CC8 (Change Management) evidence"""
evidence_items = []
# CC8.1: Change management process
change_management = self._get_change_management_evidence(start_date, end_date)
evidence_items.append({
'control_id': 'CC8.1',
'control_description': 'Change management process',
'evidence_type': 'infrastructure_changes',
'evidence_data': change_management,
'collection_timestamp': datetime.now().isoformat(),
'automated': True
})
return evidence_items
def _get_access_control_evidence(self, start_date: datetime, end_date: datetime) -> Dict:
"""Collect access control evidence from CloudTrail"""
# Query CloudTrail for authentication and authorization events
events = self.cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'EventName',
'AttributeValue': 'AssumeRole'
}
],
StartTime=start_date,
EndTime=end_date,
MaxItems=1000
)
access_events = []
for event in events.get('Events', []):
access_events.append({
'event_time': event['EventTime'].isoformat(),
'event_name': event['EventName'],
'user_identity': event.get('UserIdentity', {}),
'source_ip': event.get('SourceIPAddress'),
'user_agent': event.get('UserAgent'),
'resources': event.get('Resources', []),
'cloud_trail_event': event.get('CloudTrailEvent')
})
# Get current IAM configuration
iam_users = self.iam.list_users()
iam_roles = self.iam.list_roles()
return {
'access_events_count': len(access_events),
'access_events_sample': access_events[:50], # First 50 for audit
'iam_users_count': len(iam_users['Users']),
'iam_roles_count': len(iam_roles['Roles']),
'user_summary': [
{
'username': user['UserName'],
'create_date': user['CreateDate'].isoformat(),
'password_last_used': user.get('PasswordLastUsed', 'Never').isoformat()
if user.get('PasswordLastUsed') != 'Never' else 'Never'
}
for user in iam_users['Users']
],
'role_summary': [
{
'role_name': role['RoleName'],
'create_date': role['CreateDate'].isoformat(),
'assume_role_policy': role['AssumeRolePolicyDocument']
}
for role in iam_roles['Roles']
]
}
def _get_privileged_access_evidence(self, start_date: datetime, end_date: datetime) -> Dict:
"""Collect privileged access evidence"""
# Query for privileged operations
privileged_events = self.cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'EventName',
'AttributeValue': 'CreateUser'
}
],
StartTime=start_date,
EndTime=end_date
)
# Also check for administrative console access
console_events = self.cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'EventName',
'AttributeValue': 'ConsoleLogin'
}
],
StartTime=start_date,
EndTime=end_date
)
return {
'privileged_operations': len(privileged_events.get('Events', [])),
'console_logins': len(console_events.get('Events', [])),
'privileged_events_sample': [
{
'event_time': event['EventTime'].isoformat(),
'event_name': event['EventName'],
'user_identity': event.get('UserIdentity', {}),
'source_ip': event.get('SourceIPAddress')
}
for event in privileged_events.get('Events', [])[:20]
],
'console_login_sample': [
{
'event_time': event['EventTime'].isoformat(),
'user_identity': event.get('UserIdentity', {}),
'source_ip': event.get('SourceIPAddress'),
'mfa_used': 'Yes' if event.get('CloudTrailEvent', '').find('"mfaUsed":"true"') > -1 else 'No'
}
for event in console_events.get('Events', [])[:20]
]
}
def _get_encryption_evidence(self) -> Dict:
"""Collect encryption configuration evidence"""
# Get KMS keys
kms_client = self.session.client('kms')
keys = kms_client.list_keys()
encryption_evidence = {
'kms_keys_count': len(keys['Keys']),
'kms_keys_details': []
}
for key in keys['Keys'][:20]: # Sample first 20
try:
key_details = kms_client.describe_key(KeyId=key['KeyId'])
key_rotation = kms_client.get_key_rotation_status(KeyId=key['KeyId'])
encryption_evidence['kms_keys_details'].append({
'key_id': key['KeyId'],
'key_arn': key['Arn'],
'description': key_details['KeyMetadata'].get('Description', ''),
'key_usage': key_details['KeyMetadata'].get('KeyUsage', ''),
'key_state': key_details['KeyMetadata'].get('KeyState', ''),
'creation_date': key_details['KeyMetadata'].get('CreationDate', '').isoformat()
if key_details['KeyMetadata'].get('CreationDate') else '',
'rotation_enabled': key_rotation.get('KeyRotationEnabled', False)
})
except Exception as e:
self.logger.warning(f"Could not get details for key {key['KeyId']}: {str(e)}")
# Get S3 bucket encryption
s3_buckets = self.s3.list_buckets()
bucket_encryption = []
for bucket in s3_buckets['Buckets'][:10]: # Sample first 10
try:
encryption = self.s3.get_bucket_encryption(Bucket=bucket['Name'])
bucket_encryption.append({
'bucket_name': bucket['Name'],
'encryption_algorithm': encryption['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'],
'kms_key_id': encryption['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault'].get('KMSMasterKeyID', 'Default')
})
except:
bucket_encryption.append({
'bucket_name': bucket['Name'],
'encryption_algorithm': 'None',
'kms_key_id': 'None'
})
encryption_evidence['s3_bucket_encryption'] = bucket_encryption
return encryption_evidence
def _get_security_monitoring_evidence(self, start_date: datetime, end_date: datetime) -> Dict:
"""Collect security monitoring evidence"""
# Get CloudWatch alarms
alarms = self.cloudwatch.describe_alarms()
# Get log groups for security monitoring
log_groups = self.logs.describe_log_groups()
# Get security-related CloudTrail events
security_events = self.cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'EventName',
'AttributeValue': 'CreateSecurityGroup'
}
],
StartTime=start_date,
EndTime=end_date
)
return {
'cloudwatch_alarms_count': len(alarms['MetricAlarms']),
'security_log_groups': [
lg['logGroupName'] for lg in log_groups['logGroups']
if any(keyword in lg['logGroupName'].lower() for keyword in ['security', 'auth', 'access', 'audit'])
],
'security_events_count': len(security_events.get('Events', [])),
'monitoring_metrics': {
'alarms_in_alarm_state': len([alarm for alarm in alarms['MetricAlarms'] if alarm['StateValue'] == 'ALARM']),
'alarms_ok_state': len([alarm for alarm in alarms['MetricAlarms'] if alarm['StateValue'] == 'OK']),
'log_groups_with_retention': len([lg for lg in log_groups['logGroups'] if lg.get('retentionInDays')])
}
}
def _get_change_management_evidence(self, start_date: datetime, end_date: datetime) -> Dict:
"""Collect change management evidence"""
# Get CloudFormation stack events
cf_client = self.session.client('cloudformation')
stacks = cf_client.list_stacks(StackStatusFilter=['CREATE_COMPLETE', 'UPDATE_COMPLETE'])
change_events = []
for stack in stacks['StackSummaries'][:20]: # Sample first 20
try:
events = cf_client.describe_stack_events(StackName=stack['StackName'])
stack_changes = [
{
'stack_name': stack['StackName'],
'event_id': event['EventId'],
'timestamp': event['Timestamp'].isoformat(),
'resource_type': event.get('ResourceType'),
'resource_status': event.get('ResourceStatus'),
'resource_status_reason': event.get('ResourceStatusReason')
}
for event in events['StackEvents']
if start_date <= event['Timestamp'].replace(tzinfo=None) <= end_date
]
change_events.extend(stack_changes)
except Exception as e:
self.logger.warning(f"Could not get events for stack {stack['StackName']}: {str(e)}")
# Get EC2 instance state changes
ec2_events = self.cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'EventName',
'AttributeValue': 'RunInstances'
}
],
StartTime=start_date,
EndTime=end_date
)
return {
'cloudformation_changes': len(change_events),
'ec2_instance_launches': len(ec2_events.get('Events', [])),
'change_events_sample': change_events[:50],
'infrastructure_change_summary': {
'total_changes': len(change_events) + len(ec2_events.get('Events', [])),
'cloudformation_stacks': len(stacks['StackSummaries']),
'change_frequency_per_day': len(change_events) / max((end_date - start_date).days, 1)
}
}
def _calculate_compliance_metrics(self, evidence_items: List[Dict]) -> Dict:
"""Calculate compliance metrics from evidence"""
metrics = {
'total_controls': len(evidence_items),
'automated_controls': len([item for item in evidence_items if item.get('automated', False)]),
'manual_controls': len([item for item in evidence_items if not item.get('automated', True)]),
'evidence_completeness': 0,
'control_effectiveness': 0
}
# Calculate evidence completeness
complete_evidence = len([item for item in evidence_items if item.get('evidence_data')])
metrics['evidence_completeness'] = (complete_evidence / len(evidence_items) * 100) if evidence_items else 0
# Calculate control effectiveness (simplified scoring)
effectiveness_scores = []
for item in evidence_items:
evidence_data = item.get('evidence_data', {})
if isinstance(evidence_data, dict):
# Simple scoring based on evidence richness
score = min(100, len(str(evidence_data)) / 100) # Basic scoring
effectiveness_scores.append(score)
metrics['control_effectiveness'] = sum(effectiveness_scores) / len(effectiveness_scores) if effectiveness_scores else 0
return metrics
def _determine_compliance_status(self, metrics: Dict) -> str:
"""Determine overall compliance status"""
completeness = metrics.get('evidence_completeness', 0)
effectiveness = metrics.get('control_effectiveness', 0)
if completeness >= 95 and effectiveness >= 80:
return 'COMPLIANT'
elif completeness >= 85 and effectiveness >= 70:
return 'MOSTLY_COMPLIANT'
elif completeness >= 70:
return 'PARTIALLY_COMPLIANT'
else:
return 'NON_COMPLIANT'
def _generate_evidence_integrity(self, evidence_package: Dict) -> Dict:
"""Generate integrity hashes for evidence package"""
# Create hash of evidence data
evidence_string = json.dumps(evidence_package['control_evidence'], sort_keys=True)
evidence_hash = hashlib.sha256(evidence_string.encode()).hexdigest()
return {
'evidence_hash': evidence_hash,
'collection_timestamp': datetime.now().isoformat(),
'integrity_algorithm': 'SHA256',
'evidence_size_bytes': len(evidence_string)
}
def generate_audit_report(self, evidence_package: Dict) -> str:
"""Generate human-readable audit report"""
report = f"""
# SOC2 Type II Evidence Collection Report
Generated: {evidence_package['collection_metadata']['collection_date']}
## Collection Summary
- **Evidence Period**: {evidence_package['collection_metadata']['evidence_period_start']} to {evidence_package['collection_metadata']['evidence_period_end']}
- **AWS Account**: {evidence_package['collection_metadata']['aws_account_id']}
- **Collection Method**: Automated
- **Evidence Integrity Hash**: {evidence_package['integrity']['evidence_hash']}
## Control Evidence Summary
"""
for criteria_id, evidence in evidence_package['control_evidence'].items():
report += f"""
### {criteria_id}: {evidence['criteria_description']}
- **Compliance Status**: {evidence['compliance_status']}
- **Evidence Items**: {len(evidence['evidence_items'])}
- **Evidence Completeness**: {evidence['metrics']['evidence_completeness']:.1f}%
- **Control Effectiveness**: {evidence['metrics']['control_effectiveness']:.1f}%
"""
for item in evidence['evidence_items']:
report += f"""
#### {item['control_id']}: {item['control_description']}
- **Evidence Type**: {item['evidence_type']}
- **Collection Method**: {'Automated' if item['automated'] else 'Manual'}
- **Timestamp**: {item['collection_timestamp']}
"""
return report
if __name__ == "__main__":
# Example usage
collector = SOC2EvidenceCollector()
# Collect evidence for the last 90 days
end_date = datetime.now()
start_date = end_date - timedelta(days=90)
evidence = collector.collect_comprehensive_evidence(start_date, end_date)
# Save evidence package
with open('soc2-evidence-package.json', 'w') as f:
json.dump(evidence, f, indent=2, default=str)
# Generate audit report
report = collector.generate_audit_report(evidence)
with open('soc2-audit-report.md', 'w') as f:
f.write(report)
print("SOC2 evidence collection completed!")
print(f"Evidence package: soc2-evidence-package.json")
print(f"Audit report: soc2-audit-report.md")
2. Automated Compliance Dashboard
#!/usr/bin/env python3
# soc2-automation/compliance_dashboard.py
import boto3
import json
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime, timedelta
from typing import Dict, List
class SOC2ComplianceDashboard:
def __init__(self):
self.evidence_collector = SOC2EvidenceCollector()
def create_dashboard(self):
"""Create Streamlit dashboard for SOC2 compliance monitoring"""
st.set_page_config(
page_title="SOC2 Type II Compliance Dashboard",
page_icon="🔒",
layout="wide"
)
st.title("🔒 SOC2 Type II Compliance Dashboard")
st.markdown("Real-time compliance monitoring and evidence collection")
# Sidebar controls
st.sidebar.header("Dashboard Controls")
# Date range selector
col1, col2 = st.sidebar.columns(2)
with col1:
start_date = st.date_input("Start Date", datetime.now() - timedelta(days=90))
with col2:
end_date = st.date_input("End Date", datetime.now())
# Refresh data button
if st.sidebar.button("Refresh Evidence"):
with st.spinner("Collecting evidence..."):
evidence = self.evidence_collector.collect_comprehensive_evidence(
datetime.combine(start_date, datetime.min.time()),
datetime.combine(end_date, datetime.min.time())
)
st.session_state['evidence'] = evidence
# Load evidence data
if 'evidence' not in st.session_state:
with st.spinner("Loading initial evidence..."):
evidence = self.evidence_collector.collect_comprehensive_evidence(
datetime.combine(start_date, datetime.min.time()),
datetime.combine(end_date, datetime.min.time())
)
st.session_state['evidence'] = evidence
evidence = st.session_state['evidence']
# Main dashboard
self._create_overview_section(evidence)
self._create_compliance_metrics(evidence)
self._create_control_details(evidence)
self._create_evidence_timeline(evidence)
def _create_overview_section(self, evidence: Dict):
"""Create overview section with key metrics"""
st.header("📊 Compliance Overview")
# Calculate overall metrics
total_controls = sum(len(ctrl['evidence_items']) for ctrl in evidence['control_evidence'].values())
compliant_controls = sum(
len([item for item in ctrl['evidence_items'] if item.get('automated', False)])
for ctrl in evidence['control_evidence'].values()
)
overall_completeness = sum(
ctrl['metrics']['evidence_completeness']
for ctrl in evidence['control_evidence'].values()
) / len(evidence['control_evidence'])
overall_effectiveness = sum(
ctrl['metrics']['control_effectiveness']
for ctrl in evidence['control_evidence'].values()
) / len(evidence['control_evidence'])
# Display key metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric(
label="Overall Compliance",
value=f"{overall_completeness:.1f}%",
delta=f"{overall_completeness - 85:.1f}%" if overall_completeness >= 85 else f"{overall_completeness - 85:.1f}%"
)
with col2:
st.metric(
label="Control Effectiveness",
value=f"{overall_effectiveness:.1f}%",
delta=f"{overall_effectiveness - 80:.1f}%" if overall_effectiveness >= 80 else f"{overall_effectiveness - 80:.1f}%"
)
with col3:
st.metric(
label="Total Controls",
value=total_controls,
delta=f"🤖 {compliant_controls} automated"
)
with col4:
st.metric(
label="Evidence Period",
value=f"{(datetime.fromisoformat(evidence['collection_metadata']['evidence_period_end']) - datetime.fromisoformat(evidence['collection_metadata']['evidence_period_start'])).days} days",
delta="Continuous collection"
)
def _create_compliance_metrics(self, evidence: Dict):
"""Create compliance metrics visualization"""
st.header("📈 Trust Service Criteria Compliance")
# Prepare data for visualization
criteria_data = []
for criteria_id, ctrl_evidence in evidence['control_evidence'].items():
criteria_data.append({
'Criteria': criteria_id,
'Description': ctrl_evidence['criteria_description'],
'Completeness': ctrl_evidence['metrics']['evidence_completeness'],
'Effectiveness': ctrl_evidence['metrics']['control_effectiveness'],
'Status': ctrl_evidence['compliance_status'],
'Controls': len(ctrl_evidence['evidence_items'])
})
df = pd.DataFrame(criteria_data)
# Create compliance heatmap
fig_heatmap = px.imshow(
df[['Completeness', 'Effectiveness']].T,
labels=dict(x="Trust Service Criteria", y="Metrics", color="Score"),
x=df['Criteria'],
y=['Evidence Completeness', 'Control Effectiveness'],
color_continuous_scale='RdYlGn',
aspect="auto"
)
fig_heatmap.update_layout(title="Compliance Heatmap by Trust Service Criteria")
st.plotly_chart(fig_heatmap, use_container_width=True)
# Compliance status distribution
col1, col2 = st.columns(2)
with col1:
status_counts = df['Status'].value_counts()
fig_pie = px.pie(
values=status_counts.values,
names=status_counts.index,
title="Compliance Status Distribution"
)
st.plotly_chart(fig_pie, use_container_width=True)
with col2:
fig_bar = px.bar(
df,
x='Criteria',
y=['Completeness', 'Effectiveness'],
title="Compliance Scores by Criteria",
barmode='group'
)
st.plotly_chart(fig_bar, use_container_width=True)
def _create_control_details(self, evidence: Dict):
"""Create detailed control information"""
st.header("🔍 Control Details")
# Allow users to select criteria
selected_criteria = st.selectbox(
"Select Trust Service Criteria",
options=list(evidence['control_evidence'].keys()),
format_func=lambda x: f"{x}: {evidence['control_evidence'][x]['criteria_description']}"
)
if selected_criteria:
ctrl_evidence = evidence['control_evidence'][selected_criteria]
# Display criteria information
col1, col2 = st.columns(2)
with col1:
st.subheader(f"{selected_criteria}: {ctrl_evidence['criteria_description']}")
st.write(f"**Compliance Status:** {ctrl_evidence['compliance_status']}")
st.write(f"**Evidence Items:** {len(ctrl_evidence['evidence_items'])}")
st.write(f"**Evidence Completeness:** {ctrl_evidence['metrics']['evidence_completeness']:.1f}%")
st.write(f"**Control Effectiveness:** {ctrl_evidence['metrics']['control_effectiveness']:.1f}%")
with col2:
# Control effectiveness gauge
fig_gauge = go.Figure(go.Indicator(
mode = "gauge+number",
value = ctrl_evidence['metrics']['control_effectiveness'],
domain = {'x': [0, 1], 'y': [0, 1]},
title = {'text': "Control Effectiveness"},
gauge = {
'axis': {'range': [None, 100]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 50], 'color': "lightgray"},
{'range': [50, 80], 'color': "yellow"},
{'range': [80, 100], 'color': "green"}
],
'threshold': {
'line': {'color': "red", 'width': 4},
'thickness': 0.75,
'value': 90
}
}
))
st.plotly_chart(fig_gauge, use_container_width=True)
# Display evidence items
st.subheader("Evidence Items")
evidence_df = pd.DataFrame([
{
'Control ID': item['control_id'],
'Description': item['control_description'],
'Evidence Type': item['evidence_type'],
'Automated': '✅' if item['automated'] else '❌',
'Collection Time': item['collection_timestamp']
}
for item in ctrl_evidence['evidence_items']
])
st.dataframe(evidence_df, use_container_width=True)
# Show evidence data for selected control
selected_control = st.selectbox(
"View evidence data for control:",
options=[item['control_id'] for item in ctrl_evidence['evidence_items']]
)
if selected_control:
control_item = next(
item for item in ctrl_evidence['evidence_items']
if item['control_id'] == selected_control
)
with st.expander(f"Evidence Data for {selected_control}"):
st.json(control_item['evidence_data'])
def _create_evidence_timeline(self, evidence: Dict):
"""Create evidence collection timeline"""
st.header("📅 Evidence Collection Timeline")
# Prepare timeline data
timeline_data = []
for criteria_id, ctrl_evidence in evidence['control_evidence'].items():
for item in ctrl_evidence['evidence_items']:
timeline_data.append({
'timestamp': datetime.fromisoformat(item['collection_timestamp']),
'criteria': criteria_id,
'control_id': item['control_id'],
'evidence_type': item['evidence_type'],
'automated': item['automated']
})
if timeline_data:
df_timeline = pd.DataFrame(timeline_data)
# Create timeline chart
fig_timeline = px.scatter(
df_timeline,
x='timestamp',
y='criteria',
color='evidence_type',
symbol='automated',
title="Evidence Collection Timeline",
hover_data=['control_id', 'evidence_type']
)
fig_timeline.update_layout(height=400)
st.plotly_chart(fig_timeline, use_container_width=True)
# Evidence collection summary
st.subheader("Collection Summary")
col1, col2 = st.columns(2)
with col1:
st.write(f"**Collection Date:** {evidence['collection_metadata']['collection_date']}")
st.write(f"**AWS Account:** {evidence['collection_metadata']['aws_account_id']}")
st.write(f"**Collector Version:** {evidence['collection_metadata']['collector_version']}")
with col2:
st.write(f"**Evidence Hash:** {evidence['integrity']['evidence_hash'][:16]}...")
st.write(f"**Evidence Size:** {evidence['integrity']['evidence_size_bytes']:,} bytes")
st.write(f"**Integrity Algorithm:** {evidence['integrity']['integrity_algorithm']}")
if __name__ == "__main__":
dashboard = SOC2ComplianceDashboard()
dashboard.create_dashboard()
Continuous Compliance Automation
CI/CD Integration for SOC2 Compliance
1. GitHub Actions SOC2 Compliance Workflow
# .github/workflows/soc2-compliance.yml
name: SOC2 Compliance Validation
on:
schedule:
- cron: '0 0 * * *' # Daily compliance check
push:
branches: [main]
paths: ['infrastructure/**', 'terraform/**']
workflow_dispatch:
inputs:
evidence_period_days:
description: 'Evidence collection period in days'
required: false
default: '30'
env:
AWS_DEFAULT_REGION: us-east-1
TERRAFORM_VERSION: '1.6.6'
jobs:
infrastructure-compliance:
name: Infrastructure Compliance Validation
runs-on: ubuntu-latest
permissions:
id-token: write
contents: read
security-events: write
steps:
- name: Checkout Code
uses: actions/checkout@v4
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE }}
aws-region: ${{ env.AWS_DEFAULT_REGION }}
- name: Setup Terraform
uses: hashicorp/setup-terraform@v3
with:
terraform_version: ${{ env.TERRAFORM_VERSION }}
- name: Validate SOC2 Terraform Configurations
run: |
# Validate all Terraform configurations for SOC2 compliance
find infrastructure/ -name "*.tf" -type f | while read tf_file; do
echo "Validating SOC2 compliance for: $tf_file"
# Check for required SOC2 tags
if ! grep -q "SOC2:Control" "$tf_file"; then
echo "❌ Missing SOC2:Control tag in $tf_file"
exit 1
fi
# Check for encryption configurations
if grep -q "aws_s3_bucket\|aws_ebs_volume\|aws_rds" "$tf_file"; then
if ! grep -q "kms_key_id\|encryption" "$tf_file"; then
echo "⚠️ Potential encryption issue in $tf_file"
fi
fi
# Check for logging configurations
if grep -q "aws_vpc\|aws_lb" "$tf_file"; then
if ! grep -q "flow_log\|access_logs" "$tf_file"; then
echo "⚠️ Potential logging issue in $tf_file"
fi
fi
done
- name: Terraform Plan with SOC2 Validation
run: |
cd infrastructure/
terraform init
terraform plan -out=tfplan.binary
terraform show -json tfplan.binary > tfplan.json
- name: SOC2 Compliance Scanning
run: |
# Install compliance scanning tools
pip install boto3 pandas streamlit plotly
# Run SOC2 evidence collection
python3 << 'EOF'
import sys
import os
sys.path.append('scripts/')
from soc2_evidence_collector import SOC2EvidenceCollector
from datetime import datetime, timedelta
# Collect evidence
collector = SOC2EvidenceCollector()
evidence_days = int(os.getenv('EVIDENCE_PERIOD_DAYS', '30'))
end_date = datetime.now()
start_date = end_date - timedelta(days=evidence_days)
evidence = collector.collect_comprehensive_evidence(start_date, end_date)
# Generate compliance report
report = collector.generate_audit_report(evidence)
# Save artifacts
import json
with open('soc2-evidence.json', 'w') as f:
json.dump(evidence, f, indent=2, default=str)
with open('soc2-compliance-report.md', 'w') as f:
f.write(report)
# Check compliance thresholds
overall_completeness = sum(
ctrl['metrics']['evidence_completeness']
for ctrl in evidence['control_evidence'].values()
) / len(evidence['control_evidence'])
print(f"Overall compliance: {overall_completeness:.1f}%")
if overall_completeness < 85:
print("❌ Compliance below threshold (85%)")
sys.exit(1)
else:
print("✅ Compliance meets threshold")
EOF
env:
EVIDENCE_PERIOD_DAYS: ${{ github.event.inputs.evidence_period_days || '30' }}
- name: Upload SOC2 Evidence Artifacts
uses: actions/upload-artifact@v3
with:
name: soc2-compliance-evidence
path: |
soc2-evidence.json
soc2-compliance-report.md
tfplan.json
retention-days: 2557 # 7 years for SOC2
- name: Create Compliance Issue
if: failure()
uses: actions/github-script@v6
with:
script: |
github.rest.issues.create({
owner: context.repo.owner,
repo: context.repo.repo,
title: `SOC2 Compliance Failure - ${new Date().toISOString()}`,
body: `## SOC2 Compliance Check Failed
**Workflow Run:** ${{ github.run_id }}
**Branch:** ${{ github.ref }}
**Commit:** ${{ github.sha }}
Please review the compliance report and address any issues.`,
labels: ['compliance', 'soc2', 'urgent']
});
evidence-archive:
name: Archive Evidence for Audit
runs-on: ubuntu-latest
needs: infrastructure-compliance
if: github.ref == 'refs/heads/main'
steps:
- name: Download Evidence Artifacts
uses: actions/download-artifact@v3
with:
name: soc2-compliance-evidence
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE }}
aws-region: ${{ env.AWS_DEFAULT_REGION }}
- name: Archive Evidence to S3
run: |
# Create timestamped archive
TIMESTAMP=$(date +"%Y-%m-%d_%H-%M-%S")
ARCHIVE_PREFIX="soc2-evidence/${TIMESTAMP}"
# Upload evidence with proper metadata
aws s3 cp soc2-evidence.json s3://${{ secrets.SOC2_EVIDENCE_BUCKET }}/${ARCHIVE_PREFIX}/evidence.json \
--metadata "compliance-framework=SOC2-Type-II,collection-date=${TIMESTAMP},retention-years=7"
aws s3 cp soc2-compliance-report.md s3://${{ secrets.SOC2_EVIDENCE_BUCKET }}/${ARCHIVE_PREFIX}/report.md \
--metadata "compliance-framework=SOC2-Type-II,collection-date=${TIMESTAMP},retention-years=7"
# Create evidence integrity hash
sha256sum soc2-evidence.json > evidence-integrity.sha256
aws s3 cp evidence-integrity.sha256 s3://${{ secrets.SOC2_EVIDENCE_BUCKET }}/${ARCHIVE_PREFIX}/integrity.sha256
echo "Evidence archived to: s3://${{ secrets.SOC2_EVIDENCE_BUCKET }}/${ARCHIVE_PREFIX}/"
Business Impact and ROI
SOC2 Automation ROI Analysis
Implementation Costs vs. Benefits:
Category | Manual SOC2 | Automated SOC2 | Savings |
---|---|---|---|
Annual Audit Preparation | 2,000 hours | 400 hours | $240K |
Evidence Collection | 800 hours | 80 hours | $108K |
Compliance Monitoring | 1,200 hours | 200 hours | $150K |
Audit Duration | 8 weeks | 3 weeks | $75K |
Documentation Effort | 600 hours | 100 hours | $75K |
Total Annual Savings | - | - | $648K |
ROI Calculation:
# Annual SOC2 automation value
AUDIT_PREPARATION_SAVINGS = 240000 # Reduced preparation time
EVIDENCE_COLLECTION_SAVINGS = 108000 # Automated evidence gathering
CONTINUOUS_MONITORING_SAVINGS = 150000 # Real-time compliance
AUDIT_DURATION_SAVINGS = 75000 # Faster audit completion
DOCUMENTATION_SAVINGS = 75000 # Automated reporting
TOTAL_ANNUAL_SAVINGS = AUDIT_PREPARATION_SAVINGS + EVIDENCE_COLLECTION_SAVINGS +
CONTINUOUS_MONITORING_SAVINGS + AUDIT_DURATION_SAVINGS +
DOCUMENTATION_SAVINGS
# Total Savings: $648,000 annually
IMPLEMENTATION_COST = 150000 # Initial automation setup
ANNUAL_MAINTENANCE = 30000 # Ongoing maintenance
FIRST_YEAR_ROI = ((TOTAL_ANNUAL_SAVINGS - IMPLEMENTATION_COST - ANNUAL_MAINTENANCE) /
(IMPLEMENTATION_COST + ANNUAL_MAINTENANCE)) * 100
# ROI: 260% in first year
ONGOING_ROI = ((TOTAL_ANNUAL_SAVINGS - ANNUAL_MAINTENANCE) / ANNUAL_MAINTENANCE) * 100
# Ongoing ROI: 2,060% annually
Conclusion
SOC2 Type II automation transforms compliance from a burdensome annual process into a continuous, value-adding business capability. By implementing Infrastructure as Code with embedded security controls, automated evidence collection, and continuous compliance monitoring, organizations achieve both regulatory compliance and improved security posture.
The key to successful SOC2 automation lies in building compliance into your development and operations workflows from the beginning, rather than treating it as an after-the-fact audit exercise. This approach reduces compliance costs while improving actual security outcomes.
Remember that SOC2 automation is not about checking boxes - it’s about building systems that provide continuous assurance about your security controls and business processes.
Your SOC2 automation journey starts with implementing infrastructure as code with compliance tagging. Begin today and build towards continuous compliance validation.