Event-Driven Security Automation: CloudEvents + SIEM Integration

The Event-Driven Security Challenge
Your security infrastructure generates thousands of events every minute - failed logins, policy violations, vulnerability detections, and configuration changes. Traditional security operations rely on manual correlation, delayed response times, and siloed tools that can’t keep pace with modern attack speeds. By the time your security team notices and responds to an incident, attackers have already moved laterally through your environment.
Event-driven security automation transforms your security operations from reactive to proactive by enabling real-time threat detection, automated response, and intelligent correlation across your entire security stack.
Event-Driven Security Architecture
Event-driven security creates an intelligent security fabric that automatically correlates threats, triggers responses, and orchestrates remediation across your infrastructure in real-time.
Core Components of Event-Driven Security
1. Event Collection and Normalization
- CloudEvents standard for consistent event format across tools
- Real-time event streaming with Kafka and Knative Eventing
- Multi-source integration (SIEM, cloud providers, security tools)
- Event enrichment and context aggregation
2. Intelligent Event Processing
- Complex event processing for threat correlation
- Machine learning-based anomaly detection
- Pattern matching and rule-based automation
- Risk scoring and prioritization algorithms
3. Automated Response and Orchestration
- SOAR (Security Orchestration, Automation, and Response) workflows
- Automated containment and remediation actions
- Cross-platform security tool orchestration
- Escalation and notification management
CloudEvents Standard for Security
CloudEvents provides a standardized way to describe security events, enabling interoperability between different security tools and platforms.
Security CloudEvents Schema
1. Core Security Event Structure
{
"specversion": "1.0",
"type": "com.company.security.threat.detected",
"source": "https://security.company.com/ids",
"id": "threat-12345-67890",
"time": "2024-01-15T10:30:00Z",
"datacontenttype": "application/json",
"subject": "host/web-server-01",
"data": {
"severity": "HIGH",
"category": "malware",
"description": "Suspicious process execution detected",
"affected_assets": [
{
"type": "host",
"id": "web-server-01",
"ip": "10.0.1.100",
"hostname": "web-01.company.com"
}
],
"indicators": [
{
"type": "process",
"value": "/tmp/suspicious_binary",
"hash": "sha256:abc123..."
}
],
"metadata": {
"tool": "crowdstrike-falcon",
"rule_id": "rule-malware-001",
"confidence": 0.95,
"risk_score": 85
}
}
}
2. Event Types Taxonomy
# security-events/event-types.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: security-event-types
namespace: security-automation
data:
event-types.yaml: |
# Authentication Events
com.company.security.auth.failed_login:
description: "Failed authentication attempt"
severity_levels: [LOW, MEDIUM, HIGH]
required_fields: [user, source_ip, timestamp]
com.company.security.auth.privileged_access:
description: "Privileged account access"
severity_levels: [MEDIUM, HIGH, CRITICAL]
required_fields: [user, action, resource]
# Network Events
com.company.security.network.suspicious_traffic:
description: "Anomalous network traffic detected"
severity_levels: [LOW, MEDIUM, HIGH]
required_fields: [source_ip, destination_ip, protocol, bytes]
com.company.security.network.policy_violation:
description: "Network policy violation"
severity_levels: [MEDIUM, HIGH]
required_fields: [source, destination, policy_name]
# Vulnerability Events
com.company.security.vuln.critical_found:
description: "Critical vulnerability discovered"
severity_levels: [HIGH, CRITICAL]
required_fields: [cve_id, affected_system, cvss_score]
# Configuration Events
com.company.security.config.drift_detected:
description: "Security configuration drift"
severity_levels: [LOW, MEDIUM, HIGH]
required_fields: [resource, expected_config, actual_config]
# Malware Events
com.company.security.malware.detected:
description: "Malware detection"
severity_levels: [HIGH, CRITICAL]
required_fields: [file_hash, host, detection_engine]
# Data Loss Prevention
com.company.security.dlp.data_exfiltration:
description: "Potential data exfiltration"
severity_levels: [HIGH, CRITICAL]
required_fields: [user, data_type, destination, size]
3. Event Enrichment and Context
#!/usr/bin/env python3
# event-processing/event-enricher.py
import json
import asyncio
import aioredis
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from cloudevents.http import CloudEvent
import httpx
class SecurityEventEnricher:
def __init__(self, config: Dict):
self.config = config
self.redis_client = None
self.threat_intel_client = httpx.AsyncClient()
async def initialize(self):
"""Initialize connections to data sources"""
self.redis_client = await aioredis.from_url(
self.config['redis_url'],
encoding="utf-8",
decode_responses=True
)
async def enrich_event(self, event: CloudEvent) -> CloudEvent:
"""Enrich security event with additional context"""
enriched_data = event.data.copy()
# Add threat intelligence context
enriched_data['threat_intelligence'] = await self._get_threat_intel(event)
# Add asset context
enriched_data['asset_context'] = await self._get_asset_context(event)
# Add historical context
enriched_data['historical_context'] = await self._get_historical_context(event)
# Add user context
enriched_data['user_context'] = await self._get_user_context(event)
# Calculate enriched risk score
enriched_data['enriched_risk_score'] = self._calculate_enriched_risk_score(enriched_data)
# Create enriched event
enriched_event = CloudEvent({
"type": event["type"],
"source": event["source"],
"subject": event.get("subject"),
"id": f"enriched-{event['id']}",
"time": datetime.now().isoformat(),
"datacontenttype": "application/json"
}, enriched_data)
return enriched_event
async def _get_threat_intel(self, event: CloudEvent) -> Dict:
"""Get threat intelligence data for event indicators"""
threat_intel = {
"indicators": [],
"campaigns": [],
"threat_actors": []
}
event_data = event.data
indicators = event_data.get('indicators', [])
for indicator in indicators:
if indicator['type'] == 'ip':
intel_data = await self._query_ip_reputation(indicator['value'])
threat_intel['indicators'].append({
"indicator": indicator['value'],
"type": "ip",
"reputation": intel_data.get('reputation', 'unknown'),
"malware_families": intel_data.get('malware_families', []),
"first_seen": intel_data.get('first_seen'),
"last_seen": intel_data.get('last_seen')
})
elif indicator['type'] == 'hash':
intel_data = await self._query_file_reputation(indicator['value'])
threat_intel['indicators'].append({
"indicator": indicator['value'],
"type": "hash",
"reputation": intel_data.get('reputation', 'unknown'),
"detection_rate": intel_data.get('detection_rate', 0),
"file_type": intel_data.get('file_type')
})
return threat_intel
async def _get_asset_context(self, event: CloudEvent) -> Dict:
"""Get asset context information"""
asset_context = {}
affected_assets = event.data.get('affected_assets', [])
for asset in affected_assets:
asset_id = asset.get('id')
if asset_id:
# Query asset database/CMDB
asset_info = await self._query_asset_database(asset_id)
asset_context[asset_id] = {
"criticality": asset_info.get('criticality', 'medium'),
"environment": asset_info.get('environment', 'unknown'),
"owner": asset_info.get('owner', 'unknown'),
"business_unit": asset_info.get('business_unit', 'unknown'),
"compliance_requirements": asset_info.get('compliance_requirements', []),
"installed_software": asset_info.get('installed_software', []),
"network_zone": asset_info.get('network_zone', 'unknown')
}
return asset_context
async def _get_historical_context(self, event: CloudEvent) -> Dict:
"""Get historical context for similar events"""
# Query for similar events in the last 30 days
similar_events_key = f"events:{event['type']}:{event.get('subject', 'unknown')}"
similar_events = await self.redis_client.lrange(similar_events_key, 0, -1)
historical_context = {
"similar_events_count": len(similar_events),
"first_occurrence": None,
"frequency_trend": "unknown",
"previous_outcomes": []
}
if similar_events:
# Parse historical events
parsed_events = [json.loads(event) for event in similar_events]
historical_context["first_occurrence"] = min(
event['timestamp'] for event in parsed_events
)
# Calculate frequency trend
recent_events = [
e for e in parsed_events
if datetime.fromisoformat(e['timestamp']) > datetime.now() - timedelta(days=7)
]
older_events = [
e for e in parsed_events
if datetime.fromisoformat(e['timestamp']) <= datetime.now() - timedelta(days=7)
]
if len(recent_events) > len(older_events):
historical_context["frequency_trend"] = "increasing"
elif len(recent_events) < len(older_events):
historical_context["frequency_trend"] = "decreasing"
else:
historical_context["frequency_trend"] = "stable"
# Extract previous outcomes
historical_context["previous_outcomes"] = [
{
"timestamp": e['timestamp'],
"resolution": e.get('resolution', 'unknown'),
"time_to_resolution": e.get('time_to_resolution', 0)
}
for e in parsed_events[-5:] # Last 5 outcomes
]
return historical_context
async def _get_user_context(self, event: CloudEvent) -> Dict:
"""Get user context information"""
user_context = {}
# Extract user information from event
event_data = event.data
user = event_data.get('user') or event_data.get('username')
if user:
# Query user directory/HR system
user_info = await self._query_user_directory(user)
user_context = {
"department": user_info.get('department', 'unknown'),
"title": user_info.get('title', 'unknown'),
"manager": user_info.get('manager', 'unknown'),
"access_level": user_info.get('access_level', 'standard'),
"recent_activity": await self._get_user_recent_activity(user),
"risk_factors": await self._assess_user_risk_factors(user)
}
return user_context
def _calculate_enriched_risk_score(self, enriched_data: Dict) -> int:
"""Calculate enhanced risk score based on enriched context"""
base_score = enriched_data.get('metadata', {}).get('risk_score', 50)
# Threat intelligence multiplier
threat_intel = enriched_data.get('threat_intelligence', {})
malicious_indicators = sum(
1 for indicator in threat_intel.get('indicators', [])
if indicator.get('reputation') == 'malicious'
)
if malicious_indicators > 0:
base_score += malicious_indicators * 20
# Asset criticality multiplier
asset_context = enriched_data.get('asset_context', {})
critical_assets = sum(
1 for asset_id, context in asset_context.items()
if context.get('criticality') == 'critical'
)
if critical_assets > 0:
base_score += critical_assets * 15
# Historical context multiplier
historical_context = enriched_data.get('historical_context', {})
if historical_context.get('frequency_trend') == 'increasing':
base_score += 10
# User context multiplier
user_context = enriched_data.get('user_context', {})
if user_context.get('access_level') == 'privileged':
base_score += 15
return min(base_score, 100) # Cap at 100
async def _query_ip_reputation(self, ip: str) -> Dict:
"""Query threat intelligence for IP reputation"""
# Mock implementation - integrate with actual threat intel APIs
return {
"reputation": "unknown",
"malware_families": [],
"first_seen": None,
"last_seen": None
}
async def _query_file_reputation(self, file_hash: str) -> Dict:
"""Query threat intelligence for file reputation"""
# Mock implementation - integrate with VirusTotal, etc.
return {
"reputation": "unknown",
"detection_rate": 0,
"file_type": "unknown"
}
async def _query_asset_database(self, asset_id: str) -> Dict:
"""Query asset database/CMDB"""
# Mock implementation - integrate with CMDB
return {
"criticality": "medium",
"environment": "production",
"owner": "platform-team",
"business_unit": "engineering"
}
async def _query_user_directory(self, user: str) -> Dict:
"""Query user directory/HR system"""
# Mock implementation - integrate with Active Directory, etc.
return {
"department": "engineering",
"title": "software engineer",
"access_level": "standard"
}
async def _get_user_recent_activity(self, user: str) -> List[Dict]:
"""Get user's recent security-relevant activity"""
# Mock implementation
return []
async def _assess_user_risk_factors(self, user: str) -> List[str]:
"""Assess user-specific risk factors"""
# Mock implementation
return []
if __name__ == "__main__":
# Example usage
config = {
"redis_url": "redis://localhost:6379"
}
enricher = SecurityEventEnricher(config)
asyncio.run(enricher.initialize())
Knative Eventing for Security Automation
Knative Eventing provides a cloud-native event-driven architecture that enables scalable, serverless security automation workflows.
Knative Security Event Processing
1. Event Sources Configuration
# knative-eventing/security-event-sources.yaml
apiVersion: sources.knative.dev/v1
kind: ApiServerSource
metadata:
name: k8s-security-events
namespace: security-automation
spec:
serviceAccountName: security-automation-sa
mode: Resource
resources:
- apiVersion: v1
kind: Pod
labelSelector:
matchLabels:
security.policy/monitored: 'true'
- apiVersion: apps/v1
kind: Deployment
- apiVersion: v1
kind: Secret
- apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: security-broker
---
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: siem-events-source
namespace: security-automation
spec:
consumerGroup: security-automation
bootstrapServers:
- kafka.security.svc.cluster.local:9092
topics:
- security.alerts
- security.threats
- security.vulnerabilities
- security.compliance
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: security-broker
---
apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
name: custom-security-source
namespace: security-automation
spec:
template:
spec:
containers:
- image: company/security-event-collector:latest
env:
- name: CLOUD_EVENTS_SINK
value: 'http://broker-ingress.knative-eventing.svc.cluster.local/security-automation/security-broker'
- name: POLL_INTERVAL
value: '30s'
- name: SECURITY_TOOLS_CONFIG
valueFrom:
configMapKeyRef:
name: security-tools-config
key: config.yaml
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: security-broker
2. Event Processing Pipeline
# knative-eventing/security-processing-pipeline.yaml
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: security-broker
namespace: security-automation
spec:
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
---
# Event enrichment service
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: event-enricher
namespace: security-automation
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/minScale: '2'
autoscaling.knative.dev/maxScale: '20'
autoscaling.knative.dev/target: '100'
spec:
containers:
- image: company/security-event-enricher:latest
env:
- name: REDIS_URL
value: 'redis://redis.security.svc.cluster.local:6379'
- name: THREAT_INTEL_API_KEY
valueFrom:
secretKeyRef:
name: threat-intel-credentials
key: api-key
resources:
requests:
memory: '256Mi'
cpu: '200m'
limits:
memory: '512Mi'
cpu: '500m'
---
# Threat correlation service
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: threat-correlator
namespace: security-automation
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/minScale: '1'
autoscaling.knative.dev/maxScale: '10'
spec:
containers:
- image: company/threat-correlator:latest
env:
- name: CORRELATION_WINDOW
value: '300s'
- name: MACHINE_LEARNING_MODEL_PATH
value: '/models/threat-correlation-v2.pkl'
volumeMounts:
- name: ml-models
mountPath: /models
resources:
requests:
memory: '512Mi'
cpu: '500m'
limits:
memory: '2Gi'
cpu: '1000m'
volumes:
- name: ml-models
persistentVolumeClaim:
claimName: ml-models-pvc
---
# Automated response service
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: automated-response
namespace: security-automation
spec:
template:
spec:
containers:
- image: company/automated-response:latest
env:
- name: RESPONSE_TIMEOUT
value: '300s'
- name: DRY_RUN_MODE
value: 'false'
resources:
requests:
memory: '256Mi'
cpu: '200m'
limits:
memory: '512Mi'
cpu: '500m'
3. Event Routing and Filtering
# knative-eventing/event-routing.yaml
# Route high-severity events to immediate response
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: critical-threat-response
namespace: security-automation
spec:
broker: security-broker
filter:
attributes:
type: com.company.security.threat.detected
severity: CRITICAL
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: immediate-response
uri: /critical-threats
---
# Route authentication events to user behavior analysis
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: auth-behavior-analysis
namespace: security-automation
spec:
broker: security-broker
filter:
attributes:
type: com.company.security.auth.failed_login
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: user-behavior-analyzer
---
# Route network events to traffic analysis
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: network-traffic-analysis
namespace: security-automation
spec:
broker: security-broker
filter:
attributes:
type: com.company.security.network.suspicious_traffic
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: network-analyzer
---
# Route all events to SIEM for centralized logging
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: siem-integration
namespace: security-automation
spec:
broker: security-broker
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: siem-forwarder
uri: /ingest
---
# Complex event routing with CEL expressions
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: complex-threat-detection
namespace: security-automation
spec:
broker: security-broker
filter:
attributes:
type: com.company.security.threat.detected
cel:
# Route events with multiple indicators and high confidence
expression: 'data.metadata.confidence > 0.8 && size(data.indicators) > 1'
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: advanced-threat-processor
SIEM Integration and Event Correlation
Elastic Security (ELK) Integration
1. CloudEvents to ELK Pipeline
#!/usr/bin/env python3
# siem-integration/elk-forwarder.py
import json
import asyncio
from datetime import datetime
from typing import Dict, List
from elasticsearch import AsyncElasticsearch
from cloudevents.http import CloudEvent, from_http
from aiohttp import web
import logging
class ELKForwarder:
def __init__(self, config: Dict):
self.config = config
self.es_client = AsyncElasticsearch([config['elasticsearch_url']])
self.logger = logging.getLogger(__name__)
async def process_security_event(self, request: web.Request) -> web.Response:
"""Process incoming CloudEvent and forward to Elasticsearch"""
try:
# Parse CloudEvent
event = from_http(request.headers, await request.read())
# Transform to ELK format
elk_document = await self._transform_to_elk_format(event)
# Determine index name based on event type
index_name = self._get_index_name(event)
# Index document in Elasticsearch
await self.es_client.index(
index=index_name,
body=elk_document,
doc_type='_doc'
)
# Create timeline entry for Elastic Security
await self._create_timeline_entry(event, elk_document)
# Update threat indicators if present
await self._update_threat_indicators(event)
self.logger.info(f"Successfully indexed event {event['id']} to {index_name}")
return web.Response(status=200, text="Event processed successfully")
except Exception as e:
self.logger.error(f"Error processing event: {str(e)}")
return web.Response(status=500, text=f"Error: {str(e)}")
async def _transform_to_elk_format(self, event: CloudEvent) -> Dict:
"""Transform CloudEvent to ELK/ECS format"""
event_data = event.data
timestamp = event.get('time', datetime.now().isoformat())
# Map to Elastic Common Schema (ECS)
elk_document = {
"@timestamp": timestamp,
"event": {
"id": event['id'],
"type": [self._map_event_type(event['type'])],
"category": [self._map_event_category(event['type'])],
"outcome": self._determine_outcome(event_data),
"severity": self._map_severity(event_data.get('severity', 'MEDIUM')),
"original": json.dumps(event_data)
},
"cloud": {
"instance": {
"id": event_data.get('instance_id'),
"name": event_data.get('instance_name')
},
"provider": event_data.get('cloud_provider', 'unknown')
},
"host": self._extract_host_info(event_data),
"user": self._extract_user_info(event_data),
"network": self._extract_network_info(event_data),
"process": self._extract_process_info(event_data),
"file": self._extract_file_info(event_data),
"threat": self._extract_threat_info(event_data),
"security": {
"event_source": event['source'],
"rule_id": event_data.get('metadata', {}).get('rule_id'),
"confidence": event_data.get('metadata', {}).get('confidence'),
"risk_score": event_data.get('metadata', {}).get('risk_score')
},
"tags": self._generate_tags(event, event_data),
"labels": event_data.get('labels', {}),
"custom": {
"enrichment": event_data.get('threat_intelligence', {}),
"asset_context": event_data.get('asset_context', {}),
"historical_context": event_data.get('historical_context', {})
}
}
return elk_document
def _get_index_name(self, event: CloudEvent) -> str:
"""Determine Elasticsearch index name based on event type"""
event_type = event['type']
date_suffix = datetime.now().strftime('%Y.%m.%d')
if 'auth' in event_type:
return f"security-auth-{date_suffix}"
elif 'network' in event_type:
return f"security-network-{date_suffix}"
elif 'malware' in event_type:
return f"security-malware-{date_suffix}"
elif 'vuln' in event_type:
return f"security-vulnerability-{date_suffix}"
else:
return f"security-general-{date_suffix}"
async def _create_timeline_entry(self, event: CloudEvent, elk_document: Dict):
"""Create timeline entry in Elastic Security"""
timeline_entry = {
"@timestamp": elk_document["@timestamp"],
"timeline": {
"id": f"timeline-{event['id']}",
"title": f"Security Event: {event['type']}",
"description": elk_document["event"].get("original", ""),
"status": "active"
},
"event": elk_document["event"],
"host": elk_document.get("host", {}),
"user": elk_document.get("user", {}),
"threat": elk_document.get("threat", {})
}
await self.es_client.index(
index=f"timeline-security-{datetime.now().strftime('%Y.%m')}",
body=timeline_entry
)
async def _update_threat_indicators(self, event: CloudEvent):
"""Update threat indicator indices"""
event_data = event.data
indicators = event_data.get('indicators', [])
for indicator in indicators:
indicator_doc = {
"@timestamp": datetime.now().isoformat(),
"threat": {
"indicator": {
"type": indicator['type'],
"value": indicator['value'],
"first_seen": datetime.now().isoformat(),
"last_seen": datetime.now().isoformat(),
"confidence": event_data.get('metadata', {}).get('confidence', 0.5)
}
},
"event": {
"reference": event['id'],
"source": event['source']
}
}
# Use upsert to update existing indicators
await self.es_client.index(
index=f"threat-indicators-{datetime.now().strftime('%Y.%m')}",
id=f"{indicator['type']}-{hash(indicator['value'])}",
body=indicator_doc
)
def _map_event_type(self, cloud_event_type: str) -> str:
"""Map CloudEvent type to ECS event type"""
mapping = {
'com.company.security.auth.failed_login': 'authentication',
'com.company.security.network.suspicious_traffic': 'network',
'com.company.security.malware.detected': 'malware',
'com.company.security.vuln.critical_found': 'vulnerability'
}
return mapping.get(cloud_event_type, 'security')
def _map_event_category(self, cloud_event_type: str) -> str:
"""Map CloudEvent type to ECS event category"""
if 'auth' in cloud_event_type:
return 'authentication'
elif 'network' in cloud_event_type:
return 'network'
elif 'malware' in cloud_event_type:
return 'malware'
elif 'vuln' in cloud_event_type:
return 'vulnerability'
else:
return 'security'
def _extract_host_info(self, event_data: Dict) -> Dict:
"""Extract host information for ECS format"""
affected_assets = event_data.get('affected_assets', [])
host_assets = [asset for asset in affected_assets if asset.get('type') == 'host']
if host_assets:
host = host_assets[0]
return {
"id": host.get('id'),
"name": host.get('hostname'),
"ip": [host.get('ip')] if host.get('ip') else []
}
return {}
def _extract_user_info(self, event_data: Dict) -> Dict:
"""Extract user information for ECS format"""
user = event_data.get('user') or event_data.get('username')
if user:
return {
"name": user,
"id": event_data.get('user_id')
}
return {}
# Web application to receive CloudEvents
app = web.Application()
config = {
"elasticsearch_url": "https://elasticsearch.security.svc.cluster.local:9200"
}
forwarder = ELKForwarder(config)
app.router.add_post('/ingest', forwarder.process_security_event)
if __name__ == '__main__':
web.run_app(app, host='0.0.0.0', port=8080)
2. Splunk Integration
#!/usr/bin/env python3
# siem-integration/splunk-forwarder.py
import json
import asyncio
from datetime import datetime
from typing import Dict
import httpx
from cloudevents.http import CloudEvent, from_http
from aiohttp import web
class SplunkForwarder:
def __init__(self, config: Dict):
self.config = config
self.splunk_client = httpx.AsyncClient(
base_url=config['splunk_url'],
headers={
'Authorization': f"Splunk {config['splunk_token']}",
'Content-Type': 'application/json'
},
verify=False # Configure proper SSL in production
)
async def process_security_event(self, request: web.Request) -> web.Response:
"""Process CloudEvent and send to Splunk HEC"""
try:
# Parse CloudEvent
event = from_http(request.headers, await request.read())
# Transform to Splunk format
splunk_event = self._transform_to_splunk_format(event)
# Send to Splunk HEC
response = await self.splunk_client.post(
'/services/collector/event',
json=splunk_event
)
if response.status_code == 200:
return web.Response(status=200, text="Event sent to Splunk")
else:
return web.Response(status=500, text=f"Splunk error: {response.text}")
except Exception as e:
return web.Response(status=500, text=f"Error: {str(e)}")
def _transform_to_splunk_format(self, event: CloudEvent) -> Dict:
"""Transform CloudEvent to Splunk HEC format"""
event_data = event.data
# Create Splunk event
splunk_event = {
"time": datetime.fromisoformat(event.get('time', datetime.now().isoformat())).timestamp(),
"host": self._extract_host(event_data),
"source": event['source'],
"sourcetype": self._determine_sourcetype(event['type']),
"index": self._determine_index(event['type']),
"event": {
"cloud_event_id": event['id'],
"cloud_event_type": event['type'],
"cloud_event_subject": event.get('subject'),
"severity": event_data.get('severity', 'MEDIUM'),
"category": self._extract_category(event['type']),
"description": event_data.get('description', ''),
"risk_score": event_data.get('metadata', {}).get('risk_score', 50),
"confidence": event_data.get('metadata', {}).get('confidence', 0.5),
"indicators": event_data.get('indicators', []),
"affected_assets": event_data.get('affected_assets', []),
"threat_intelligence": event_data.get('threat_intelligence', {}),
"asset_context": event_data.get('asset_context', {}),
"historical_context": event_data.get('historical_context', {}),
"user_context": event_data.get('user_context', {}),
"enriched_risk_score": event_data.get('enriched_risk_score', 50),
"raw_event": event_data
}
}
return splunk_event
def _determine_sourcetype(self, event_type: str) -> str:
"""Determine Splunk sourcetype based on event type"""
if 'auth' in event_type:
return 'security:authentication'
elif 'network' in event_type:
return 'security:network'
elif 'malware' in event_type:
return 'security:malware'
elif 'vuln' in event_type:
return 'security:vulnerability'
else:
return 'security:general'
def _determine_index(self, event_type: str) -> str:
"""Determine Splunk index based on event type"""
if any(keyword in event_type for keyword in ['critical', 'high']):
return 'security_critical'
else:
return 'security_main'
Automated Incident Response Workflows
SOAR Integration with Phantom/Splunk SOAR
#!/usr/bin/env python3
# incident-response/soar-orchestrator.py
import json
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from cloudevents.http import CloudEvent
import httpx
from enum import Enum
class ResponseAction(Enum):
ISOLATE_HOST = "isolate_host"
BLOCK_IP = "block_ip"
DISABLE_USER = "disable_user"
QUARANTINE_FILE = "quarantine_file"
CREATE_TICKET = "create_ticket"
NOTIFY_TEAM = "notify_team"
COLLECT_FORENSICS = "collect_forensics"
class SOAROrchestrator:
def __init__(self, config: Dict):
self.config = config
self.phantom_client = httpx.AsyncClient(
base_url=config['phantom_url'],
headers={'ph-auth-token': config['phantom_token']}
)
self.response_playbooks = self._load_response_playbooks()
def _load_response_playbooks(self) -> Dict:
"""Load automated response playbooks"""
return {
"malware_detected": {
"severity_thresholds": {
"CRITICAL": [
ResponseAction.ISOLATE_HOST,
ResponseAction.QUARANTINE_FILE,
ResponseAction.CREATE_TICKET,
ResponseAction.NOTIFY_TEAM,
ResponseAction.COLLECT_FORENSICS
],
"HIGH": [
ResponseAction.QUARANTINE_FILE,
ResponseAction.CREATE_TICKET,
ResponseAction.NOTIFY_TEAM
],
"MEDIUM": [
ResponseAction.CREATE_TICKET
]
},
"containment_timeout": 300, # 5 minutes
"escalation_timeout": 1800 # 30 minutes
},
"network_intrusion": {
"severity_thresholds": {
"CRITICAL": [
ResponseAction.BLOCK_IP,
ResponseAction.ISOLATE_HOST,
ResponseAction.CREATE_TICKET,
ResponseAction.NOTIFY_TEAM,
ResponseAction.COLLECT_FORENSICS
],
"HIGH": [
ResponseAction.BLOCK_IP,
ResponseAction.CREATE_TICKET,
ResponseAction.NOTIFY_TEAM
]
},
"containment_timeout": 180,
"escalation_timeout": 900
},
"data_exfiltration": {
"severity_thresholds": {
"CRITICAL": [
ResponseAction.BLOCK_IP,
ResponseAction.DISABLE_USER,
ResponseAction.ISOLATE_HOST,
ResponseAction.CREATE_TICKET,
ResponseAction.NOTIFY_TEAM,
ResponseAction.COLLECT_FORENSICS
],
"HIGH": [
ResponseAction.DISABLE_USER,
ResponseAction.CREATE_TICKET,
ResponseAction.NOTIFY_TEAM
]
},
"containment_timeout": 120, # 2 minutes
"escalation_timeout": 600 # 10 minutes
}
}
async def process_security_incident(self, event: CloudEvent) -> Dict:
"""Process security incident and execute automated response"""
incident_id = f"incident-{event['id']}"
event_data = event.data
# Determine incident type and severity
incident_type = self._classify_incident(event['type'])
severity = event_data.get('severity', 'MEDIUM')
# Create incident in SOAR platform
incident = await self._create_phantom_incident(incident_id, event, incident_type, severity)
# Execute automated response based on playbook
response_result = await self._execute_response_playbook(
incident_id, incident_type, severity, event_data
)
# Schedule escalation if needed
await self._schedule_escalation(incident_id, incident_type, severity)
return {
"incident_id": incident_id,
"incident_type": incident_type,
"severity": severity,
"automated_actions": response_result['actions_taken'],
"manual_actions_required": response_result['manual_actions'],
"escalation_scheduled": response_result['escalation_scheduled']
}
async def _create_phantom_incident(self, incident_id: str, event: CloudEvent,
incident_type: str, severity: str) -> Dict:
"""Create incident in Phantom SOAR"""
event_data = event.data
incident_data = {
"name": f"{incident_type.title()} - {incident_id}",
"description": event_data.get('description', 'Automated security incident'),
"severity": severity.lower(),
"status": "new",
"source_data_identifier": event['id'],
"custom_fields": {
"cloud_event_type": event['type'],
"cloud_event_source": event['source'],
"risk_score": event_data.get('enriched_risk_score', 50),
"affected_assets": json.dumps(event_data.get('affected_assets', [])),
"indicators": json.dumps(event_data.get('indicators', [])),
"threat_intelligence": json.dumps(event_data.get('threat_intelligence', {}))
},
"artifacts": self._create_artifacts_from_event(event_data)
}
response = await self.phantom_client.post('/rest/container', json=incident_data)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to create Phantom incident: {response.text}")
async def _execute_response_playbook(self, incident_id: str, incident_type: str,
severity: str, event_data: Dict) -> Dict:
"""Execute automated response playbook"""
playbook = self.response_playbooks.get(incident_type, {})
actions = playbook.get('severity_thresholds', {}).get(severity, [])
actions_taken = []
manual_actions = []
for action in actions:
try:
if action == ResponseAction.ISOLATE_HOST:
result = await self._isolate_host(event_data)
actions_taken.append({"action": "isolate_host", "result": result})
elif action == ResponseAction.BLOCK_IP:
result = await self._block_ip(event_data)
actions_taken.append({"action": "block_ip", "result": result})
elif action == ResponseAction.DISABLE_USER:
result = await self._disable_user(event_data)
actions_taken.append({"action": "disable_user", "result": result})
elif action == ResponseAction.QUARANTINE_FILE:
result = await self._quarantine_file(event_data)
actions_taken.append({"action": "quarantine_file", "result": result})
elif action == ResponseAction.CREATE_TICKET:
result = await self._create_ticket(incident_id, event_data)
actions_taken.append({"action": "create_ticket", "result": result})
elif action == ResponseAction.NOTIFY_TEAM:
result = await self._notify_security_team(incident_id, event_data)
actions_taken.append({"action": "notify_team", "result": result})
elif action == ResponseAction.COLLECT_FORENSICS:
# This typically requires manual intervention
manual_actions.append({
"action": "collect_forensics",
"description": "Collect forensic evidence from affected systems",
"priority": "high"
})
except Exception as e:
actions_taken.append({
"action": action.value,
"result": "failed",
"error": str(e)
})
# Schedule escalation based on playbook timing
escalation_scheduled = False
containment_timeout = playbook.get('containment_timeout', 600)
if severity in ['CRITICAL', 'HIGH']:
asyncio.create_task(
self._schedule_escalation_timer(incident_id, containment_timeout)
)
escalation_scheduled = True
return {
"actions_taken": actions_taken,
"manual_actions": manual_actions,
"escalation_scheduled": escalation_scheduled
}
async def _isolate_host(self, event_data: Dict) -> str:
"""Isolate affected host from network"""
affected_assets = event_data.get('affected_assets', [])
host_assets = [asset for asset in affected_assets if asset.get('type') == 'host']
for host in host_assets:
host_id = host.get('id')
if host_id:
# Integration with endpoint security platform (CrowdStrike, Carbon Black, etc.)
isolation_result = await self._call_endpoint_security_api(
'isolate', {'host_id': host_id}
)
return f"Host {host_id} isolated successfully"
return "No hosts found to isolate"
async def _block_ip(self, event_data: Dict) -> str:
"""Block malicious IP addresses"""
indicators = event_data.get('indicators', [])
ip_indicators = [ind for ind in indicators if ind.get('type') == 'ip']
blocked_ips = []
for indicator in ip_indicators:
ip_address = indicator.get('value')
if ip_address:
# Integration with firewall/network security platform
block_result = await self._call_firewall_api(
'block_ip', {'ip': ip_address, 'duration': 3600}
)
blocked_ips.append(ip_address)
return f"Blocked IPs: {', '.join(blocked_ips)}"
async def _disable_user(self, event_data: Dict) -> str:
"""Disable compromised user account"""
user = event_data.get('user') or event_data.get('username')
if user:
# Integration with identity provider (Active Directory, Azure AD, etc.)
disable_result = await self._call_identity_provider_api(
'disable_user', {'username': user}
)
return f"User {user} disabled successfully"
return "No user found to disable"
async def _quarantine_file(self, event_data: Dict) -> str:
"""Quarantine malicious files"""
indicators = event_data.get('indicators', [])
file_indicators = [ind for ind in indicators if ind.get('type') in ['hash', 'file']]
quarantined_files = []
for indicator in file_indicators:
file_hash = indicator.get('value')
if file_hash:
# Integration with endpoint security platform
quarantine_result = await self._call_endpoint_security_api(
'quarantine_file', {'file_hash': file_hash}
)
quarantined_files.append(file_hash[:8] + "...")
return f"Quarantined files: {', '.join(quarantined_files)}"
async def _create_ticket(self, incident_id: str, event_data: Dict) -> str:
"""Create ticket in IT service management system"""
# Integration with ITSM (ServiceNow, Jira, etc.)
ticket_data = {
"title": f"Security Incident: {incident_id}",
"description": event_data.get('description', 'Automated security incident'),
"priority": self._map_severity_to_priority(event_data.get('severity', 'MEDIUM')),
"category": "Security",
"assigned_to": "security-team"
}
ticket_result = await self._call_itsm_api('create_ticket', ticket_data)
return f"Ticket created: {ticket_result.get('ticket_number', 'unknown')}"
async def _notify_security_team(self, incident_id: str, event_data: Dict) -> str:
"""Notify security team via multiple channels"""
notification_message = f"""
🚨 SECURITY INCIDENT ALERT 🚨
Incident ID: {incident_id}
Severity: {event_data.get('severity', 'MEDIUM')}
Description: {event_data.get('description', 'Automated security incident')}
Risk Score: {event_data.get('enriched_risk_score', 50)}
Affected Assets: {len(event_data.get('affected_assets', []))}
Threat Indicators: {len(event_data.get('indicators', []))}
Please review and take appropriate action.
"""
# Send to multiple channels
channels_notified = []
# Slack notification
slack_result = await self._send_slack_notification(notification_message)
if slack_result:
channels_notified.append("Slack")
# Email notification
email_result = await self._send_email_notification(
"Security Team", "security-team@company.com",
f"Security Incident: {incident_id}", notification_message
)
if email_result:
channels_notified.append("Email")
# PagerDuty for critical incidents
if event_data.get('severity') == 'CRITICAL':
pagerduty_result = await self._create_pagerduty_incident(incident_id, event_data)
if pagerduty_result:
channels_notified.append("PagerDuty")
return f"Notifications sent via: {', '.join(channels_notified)}"
# Mock API integration methods (implement with actual APIs)
async def _call_endpoint_security_api(self, action: str, params: Dict) -> Dict:
# Integration with CrowdStrike, Carbon Black, etc.
return {"status": "success", "action": action, "params": params}
async def _call_firewall_api(self, action: str, params: Dict) -> Dict:
# Integration with Palo Alto, Fortinet, etc.
return {"status": "success", "action": action, "params": params}
async def _call_identity_provider_api(self, action: str, params: Dict) -> Dict:
# Integration with Active Directory, Azure AD, etc.
return {"status": "success", "action": action, "params": params}
async def _call_itsm_api(self, action: str, params: Dict) -> Dict:
# Integration with ServiceNow, Jira, etc.
return {"status": "success", "ticket_number": "INC123456"}
async def _send_slack_notification(self, message: str) -> bool:
# Slack webhook integration
return True
async def _send_email_notification(self, name: str, email: str, subject: str, body: str) -> bool:
# Email service integration
return True
async def _create_pagerduty_incident(self, incident_id: str, event_data: Dict) -> bool:
# PagerDuty API integration
return True
Performance and Scalability Metrics
Event Processing Performance
Component | Throughput | Latency | Resource Usage |
---|---|---|---|
CloudEvents Ingestion | 10K events/sec | < 50ms | 2 CPU, 4GB RAM |
Event Enrichment | 5K events/sec | 100-300ms | 4 CPU, 8GB RAM |
SIEM Integration | 8K events/sec | < 100ms | 2 CPU, 4GB RAM |
Automated Response | 1K actions/sec | 500ms-5min | 1 CPU, 2GB RAM |
Business Impact Analysis
Implementation Costs:
- Event-driven architecture setup: 4-6 months engineering
- SIEM integration and tuning: 2-3 months
- SOAR playbook development: 3-4 months
- Training and process development: 2-3 months
Security Improvements:
- 80% reduction in incident response time (from hours to minutes)
- 95% automation of routine security operations
- 70% improvement in threat detection accuracy
- 90% reduction in false positive investigation time
ROI Calculation:
# Annual event-driven security value
FASTER_INCIDENT_RESPONSE_VALUE = 2000000 # USD from faster response
AUTOMATED_OPERATIONS_SAVINGS = 1500000 # USD from automation
IMPROVED_DETECTION_VALUE = 1000000 # USD from better detection
REDUCED_FALSE_POSITIVES = 800000 # USD from efficiency
TOTAL_VALUE = FASTER_INCIDENT_RESPONSE_VALUE + AUTOMATED_OPERATIONS_SAVINGS +
IMPROVED_DETECTION_VALUE + REDUCED_FALSE_POSITIVES
# Total Value: $5,300,000 annually
IMPLEMENTATION_COST = 700000 # Total first year
ROI = ((TOTAL_VALUE - IMPLEMENTATION_COST) / IMPLEMENTATION_COST) * 100
# ROI: 657% in first year
Conclusion
Event-driven security automation with CloudEvents and SIEM integration transforms security operations from reactive to proactive, enabling real-time threat detection and automated response at enterprise scale. By implementing standardized event formats, intelligent correlation, and automated orchestration, organizations can significantly reduce response times while improving security outcomes.
The key to success lies in starting with event standardization, implementing comprehensive enrichment, and gradually building automation based on proven playbooks and organizational capabilities.
Remember that event-driven security is not about replacing human analysts - it’s about empowering them with intelligent automation that handles routine tasks and escalates complex threats with rich context and suggested actions.
Your event-driven security journey starts with standardizing your first security event. Begin today.