OP.MON.2 Sistema de métricas
☑️Aplicación de la medida
De acuerdo al ANEXO II, 2 Selección de medidas de seguridad, la medida de seguridad OP.MON.2 Sistema de métricas sí aplica dada la categoría de seguridad del sistema.
Documentos de referencia
- ISO/IEC 27000:
- 27002:2013:
- 12.1.3 - Capacity management
- 16.1.7 - Collection of evidence
- 27002:2013:
- NIST SP 800-53 rev4:
- [AU-6] Audit Review, Analysis, and Reporting
- [SI-4] Information System Monitoring
- [CA-7] Continuous Monitoring
- Otras referencias:
- FDA Cybersecurity in Medical Devices: Quality System Considerations
- ISO 13485:2016 - Medical devices quality management systems
- MDR Regulation (EU) 2017/745
Guía de implantación
- Se establecerá un sistema de métricas que permita la monitorización de la seguridad del dominio gestionado, detectar desviaciones respecto del comportamiento esperado y realizar una estimación de la capacidad.
Incluirá al menos:
- Definición de los parámetros e indicadores a medir
- Herramientas de recogida, almacenamiento y análisis de métricas
- Procedimientos de análisis y generación de informes
- Procedimientos de gestión de desviaciones
Implementación en Legit Health Plus
1. Marco de métricas para dispositivo médico
El sistema de métricas de Legit Health Plus integra monitorización técnica, clínica y regulatoria para garantizar operación segura y efectiva del dispositivo médico Clase IIa.
1.1 Categorías de métricas
Métricas de seguridad clínica
Clinical_Safety_Metrics:
Diagnostic_Accuracy:
- "AI model precision: >= 94.5%"
- "Recall rate: >= 92.0%"
- "False positive rate: <= 3.0%"
- "Clinical correlation score: >= 0.85"
Patient_Safety:
- "Critical diagnosis response time: <= 2 minutes"
- "System availability during clinical hours: >= 99.9%"
- "Data integrity verification: 100%"
- "Clinical workflow completion rate: >= 98%"
Quality_Indicators:
- "Image quality acceptance rate: >= 95%"
- "Clinical report completeness: 100%"
- "Professional user satisfaction: >= 4.5/5"
- "Patient outcome correlation: monitored"
Métricas de ciberseguridad
Cybersecurity_Metrics:
Threat_Detection:
- "Mean Time to Detection (MTTD): <= 5 minutes"
- "False positive rate: <= 5%"
- "Security alert response time: <= 15 minutes"
- "Threat intelligence coverage: >= 95%"
Access_Control:
- "Authentication success rate: >= 99.5%"
- "Unauthorized access attempts: monitored"
- "Privileged account monitoring: 100%"
- "Session anomaly detection rate: >= 90%"
Data_Protection:
- "Encryption coverage: 100%"
- "Data loss incidents: 0"
- "Backup integrity verification: 100%"
- "Data retention compliance: 100%"
Métricas de rendimiento del sistema
System_Performance_Metrics:
Availability:
- "System uptime: >= 99.9%"
- "Planned downtime: <= 4 hours/month"
- "Recovery time objective (RTO): <= 30 minutes"
- "Recovery point objective (RPO): <= 5 minutes"
Performance:
- "API response time: <= 2 seconds (95th percentile)"
- "Image processing time: <= 30 seconds"
- "Database query performance: <= 100ms"
- "Concurrent user capacity: >= 200"
Resource_Utilization:
- "CPU utilization: <= 70% (normal operation)"
- "Memory utilization: <= 80%"
- "Storage growth rate: monitored"
- "Network bandwidth utilization: <= 60%"
2. Arquitectura del sistema de métricas
2.1 Stack de monitorización
Monitoring_Stack:
Metrics_Collection:
- "Prometheus + Node Exporter"
- "Application metrics (custom)"
- "Medical device specific collectors"
- "Cloud provider metrics (AWS CloudWatch)"
Time_Series_Storage:
- "Primary: Prometheus TSDB"
- "Long-term: InfluxDB"
- "Retention: 2 years (regulatory requirement)"
Visualization:
- "Grafana dashboards"
- "Medical-specific views"
- "Executive reporting"
- "Regulatory compliance dashboards"
Alerting:
- "Prometheus Alertmanager"
- "PagerDuty integration"
- "Medical emergency escalation"
- "Regulatory notification automation"
2.2 Arquitectura de colección
# Colector personalizado para métricas médicas
class MedicalDeviceMetricsCollector:
def __init__(self):
self.prometheus_gateway = PrometheusGateway()
self.clinical_db = ClinicalDatabase()
self.ai_model_monitor = AIModelMonitor()
def collect_clinical_metrics(self):
"""Recolecta métricas clínicas críticas"""
metrics = {
# Precisión del modelo IA
'ai_model_accuracy': self.ai_model_monitor.get_current_accuracy(),
# Tiempo de respuesta clínica
'clinical_response_time': self.measure_clinical_response_time(),
# Disponibilidad del servicio diagnóstico
'diagnosis_service_availability': self.check_diagnosis_service_health(),
# Integridad de datos clínicos
'clinical_data_integrity': self.verify_clinical_data_integrity(),
# Tasa de finalización de workflows clínicos
'clinical_workflow_completion': self.calculate_workflow_completion_rate()
}
# Envíar a Prometheus
for metric_name, metric_value in metrics.items():
self.prometheus_gateway.send_metric(
job='medical_device_metrics',
metric_name=metric_name,
metric_value=metric_value,
labels={'device_type': 'dermatology_ai', 'class': 'IIa'}
)
return metrics
def collect_regulatory_compliance_metrics(self):
"""Métricas específicas para cumplimiento regulatorio"""
return {
'mdr_compliance_score': self.calculate_mdr_compliance(),
'fda_cybersecurity_score': self.calculate_fda_cybersecurity_compliance(),
'audit_trail_completeness': self.verify_audit_trail_completeness(),
'incident_response_readiness': self.assess_incident_response_readiness()
}
3. Dashboards especializados
3.1 Dashboard clínico ejecutivo
Clinical_Executive_Dashboard:
Overview_Panels:
- "Diagnostic Accuracy Trend (30 days)"
- "System Availability (Clinical Hours)"
- "Patient Safety Indicators"
- "Critical Alerts Summary"
Clinical_KPIs:
- "Daily Diagnoses Processed"
- "Average Diagnosis Confidence Score"
- "Professional User Adoption Rate"
- "Clinical Outcome Tracking"
Regulatory_Status:
- "MDR Compliance Status"
- "FDA Cybersecurity Compliance"
- "Incident Response Metrics"
- "Audit Readiness Score"
3.2 Dashboard técnico operacional
{
"dashboard": {
"title": "Legit Health Plus - Technical Operations",
"panels": [
{
"title": "System Health Overview",
"type": "stat",
"targets": [
{
"expr": "up{job=\"diagnosis-service\"}",
"legendFormat": "Diagnosis Service Status"
},
{
"expr": "postgresql_up{instance=~\".*clinical.*\"}",
"legendFormat": "Clinical Database Status"
}
]
},
{
"title": "API Performance",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, http_request_duration_seconds_bucket{job=\"medical-api\"})",
"legendFormat": "95th Percentile Response Time"
}
]
},
{
"title": "AI Model Performance",
"type": "graph",
"targets": [
{
"expr": "ai_model_accuracy_score{model_type=\"dermatology\"}",
"legendFormat": "Model Accuracy"
},
{
"expr": "ai_inference_time_seconds{model_type=\"dermatology\"}",
"legendFormat": "Inference Time"
}
]
}
]
}
}
4. Alertas y umbrales críticos
4.1 Configuración de alertas médicas
Medical_Critical_Alerts:
Diagnostic_System_Down:
condition: 'up{job="diagnosis-service"} == 0'
duration: "1m"
severity: "critical"
escalation:
- "Immediate: CTO, CMO"
- "5min: CEO, Board"
- "15min: Regulatory notification preparation"
AI_Model_Accuracy_Degradation:
condition: "ai_model_accuracy_score < 0.92"
duration: "5m"
severity: "critical"
escalation:
- "Immediate: AI Team Lead, CMO"
- "10min: Clinical Safety Officer"
- "30min: Model rollback procedure"
Clinical_Data_Integrity_Issue:
condition: "clinical_data_integrity_score < 1.0"
duration: "0s" # Immediate
severity: "critical"
escalation:
- "Immediate: CTO, CMO, Legal"
- "1min: Database administrator"
- "5min: Forensic data collection"
Patient_Safety_Alert:
condition: "critical_diagnosis_response_time > 120" # 2 minutes
duration: "1m"
severity: "high"
escalation:
- "Immediate: CMO, Clinical Team"
- "5min: System performance team"
4.2 Script de respuesta automática
#!/usr/bin/env python3
# Respuesta automática a alertas críticas
import json
import requests
from datetime import datetime
class MedicalAlertHandler:
def __init__(self):
self.notification_channels = {
'slack': 'https://hooks.slack.com/services/...medical-alerts',
'pagerduty': 'https://events.pagerduty.com/v2/enqueue',
'sms': 'https://api.twilio.com/2010-04-01/Accounts/...'
}
def handle_critical_alert(self, alert_data):
"""Maneja alertas críticas con respuesta automática"""
alert_type = alert_data.get('alertname')
severity = alert_data.get('severity', 'unknown')
response_actions = []
if alert_type == 'DiagnosticSystemDown':
response_actions = [
self.activate_failover_system(),
self.notify_clinical_teams(),
self.prepare_patient_communication(),
self.initiate_technical_response()
]
elif alert_type == 'AIModelAccuracyDegradation':
response_actions = [
self.rollback_ai_model(),
self.notify_clinical_safety_officer(),
self.preserve_model_state_for_analysis(),
self.activate_manual_review_process()
]
elif alert_type == 'ClinicalDataIntegrityIssue':
response_actions = [
self.isolate_affected_data(),
self.initiate_forensic_collection(),
self.notify_legal_and_compliance(),
self.prepare_regulatory_notification()
]
# Ejecutar acciones de respuesta
results = []
for action in response_actions:
try:
result = action()
results.append({'action': action.__name__, 'status': 'success', 'result': result})
except Exception as e:
results.append({'action': action.__name__, 'status': 'error', 'error': str(e)})
return results
5. Análisis predictivo y capacity planning
5.1 Modelo predictivo para recursos
class MedicalSystemCapacityAnalyzer:
def __init__(self):
self.historical_data = self.load_historical_metrics()
self.growth_models = self.load_growth_prediction_models()
def predict_clinical_workload(self, forecast_days=30):
"""Predice carga de trabajo clínica futura"""
features = {
'historical_diagnoses_per_day': self.get_daily_diagnosis_trends(),
'seasonal_patterns': self.analyze_seasonal_clinical_patterns(),
'new_hospital_adoptions': self.get_adoption_pipeline(),
'ai_model_efficiency_trends': self.analyze_model_efficiency()
}
predictions = {
'expected_daily_diagnoses': self.growth_models['workload'].predict(features),
'peak_concurrent_users': self.growth_models['concurrency'].predict(features),
'storage_requirements': self.growth_models['storage'].predict(features),
'compute_requirements': self.growth_models['compute'].predict(features)
}
return predictions
def generate_capacity_recommendations(self):
"""Genera recomendaciones de capacidad"""
predictions = self.predict_clinical_workload()
current_capacity = self.assess_current_capacity()
recommendations = []
if predictions['peak_concurrent_users'] > current_capacity['max_users'] * 0.8:
recommendations.append({
'type': 'scale_up',
'component': 'web_servers',
'current': current_capacity['web_servers'],
'recommended': current_capacity['web_servers'] * 1.5,
'timeline': '2 weeks',
'reason': 'Expected user growth will exceed 80% capacity'
})
return recommendations
### 10. Dashboard ejecutivo de métricas ENS
#### 10.1 Diseño del dashboard ejecutivo
```yaml
ENS_Executive_Dashboard:
Overview_Section:
Primary_KPIs:
- "ENS Compliance Score: XX%"
- "Critical Systems Availability: XX%"
- "Active Security Incidents: XX"
- "Days Since Last Major Incident: XX"
Status_Indicators:
- "Marco Organizativo Status: GREEN/YELLOW/RED"
- "Marco Operacional Status: GREEN/YELLOW/RED"
- "Medidas de Protección Status: GREEN/YELLOW/RED"
- "Regulatory Compliance Status: GREEN/YELLOW/RED"
Detailed_Sections:
Organizational_Framework:
- "Security Policy Compliance: XX%"
- "Personnel Security Training: XX%"
- "Authorization Process Effectiveness: XX%"
- "Incident Response Readiness: XX%"
Operational_Framework:
- "Risk Assessment Currency: XX days"
- "Access Control Effectiveness: XX%"
- "Change Management Compliance: XX%"
- "Monitoring System Health: XX%"
Protection_Measures:
- "Infrastructure Security Score: XX%"
- "Information Protection Level: XX%"
- "Communications Security: XX%"
- "Application Security Posture: XX%"
10.2 Implementación técnica del dashboard
class ENSExecutiveDashboard:
def __init__(self):
self.metrics_collector = ENSMetricsCollector()
self.compliance_calculator = ENSComplianceCalculator()
self.trend_analyzer = TrendAnalyzer()
self.alert_manager = AlertManager()
def generate_executive_summary(self, time_period='current_month'):
"""Genera resumen ejecutivo de métricas ENS"""
# Recopilar métricas de todos los marcos
raw_metrics = self.metrics_collector.collect_all_ens_metrics(time_period)
# Calcular scores de cumplimiento
compliance_scores = self.compliance_calculator.calculate_framework_compliance(raw_metrics)
# Análisis de tendencias
trends = self.trend_analyzer.analyze_trends(raw_metrics, time_period)
dashboard_data = {
'summary': {
'overall_ens_compliance': compliance_scores['overall'],
'critical_systems_availability': raw_metrics['system_availability']['average'],
'active_incidents': raw_metrics['security_incidents']['active_count'],
'days_since_major_incident': raw_metrics['security_incidents']['days_since_major']
},
'framework_status': {
'organizational': {
'status': self.determine_status_color(compliance_scores['organizational']),
'score': compliance_scores['organizational'],
'trend': trends['organizational']['direction']
},
'operational': {
'status': self.determine_status_color(compliance_scores['operational']),
'score': compliance_scores['operational'],
'trend': trends['operational']['direction']
},
'protection_measures': {
'status': self.determine_status_color(compliance_scores['protection_measures']),
'score': compliance_scores['protection_measures'],
'trend': trends['protection_measures']['direction']
}
},
'key_metrics': {
'security_training_completion': raw_metrics['personnel']['training_completion'],
'vulnerability_remediation_time': raw_metrics['vulnerabilities']['avg_remediation_time'],
'backup_success_rate': raw_metrics['backup']['success_rate'],
'incident_response_time': raw_metrics['incidents']['avg_response_time']
},
'alerts': self.get_executive_alerts(),
'recommendations': self.generate_executive_recommendations(compliance_scores, trends)
}
return dashboard_data
def determine_status_color(self, score):
"""Determina color de estado basado en puntuación"""
if score >= 90:
return 'GREEN'
elif score >= 75:
return 'YELLOW'
else:
return 'RED'
def get_executive_alerts(self):
"""Obtiene alertas críticas para nivel ejecutivo"""
critical_alerts = self.alert_manager.get_alerts(
severity=['CRITICAL', 'HIGH'],
audience='executive',
time_window='24h'
)
formatted_alerts = []
for alert in critical_alerts:
formatted_alerts.append({
'title': alert['title'],
'severity': alert['severity'],
'impact': alert['business_impact'],
'eta_resolution': alert['estimated_resolution_time'],
'owner': alert['responsible_team']
})
return formatted_alerts[:10] # Top 10 alerts
def generate_executive_recommendations(self, compliance_scores, trends):
"""Genera recomendaciones para nivel ejecutivo"""
recommendations = []
# Recomendaciones basadas en puntuaciones bajas
if compliance_scores['organizational'] < 80:
recommendations.append({
'priority': 'HIGH',
'category': 'Organizational Framework',
'recommendation': 'Reforzar políticas de seguridad y programas de formación',
'estimated_effort': '2-4 weeks',
'business_risk': 'Potential regulatory non-compliance'
})
if compliance_scores['operational'] < 75:
recommendations.append({
'priority': 'CRITICAL',
'category': 'Operational Framework',
'recommendation': 'Implementar mejoras en gestión de acceso y monitorización',
'estimated_effort': '4-8 weeks',
'business_risk': 'Increased security incident risk'
})
# Recomendaciones basadas en tendencias negativas
for framework, trend_data in trends.items():
if trend_data['direction'] == 'DECLINING' and trend_data['severity'] == 'SIGNIFICANT':
recommendations.append({
'priority': 'HIGH',
'category': f'{framework.title()} Framework',
'recommendation': f'Investigar y corregir tendencia descendente en {framework}',
'estimated_effort': '1-2 weeks',
'business_risk': 'Degradation of security posture'
})
return recommendations[:5] # Top 5 recommendations
11. Sistema de métricas automatizado
11.1 Automatización de recolección de métricas
class ENSAutomatedMetricsCollector:
def __init__(self):
self.data_sources = {
'prometheus': PrometheusClient(),
'splunk': SplunkClient(),
'active_directory': ADClient(),
'vulnerability_scanner': NessusClient(),
'backup_system': VeeamClient(),
'firewall': FortiGateClient(),
'endpoint_protection': CrowdStrikeClient()
}
self.metric_definitions = self.load_ens_metric_definitions()
def collect_ens_control_metrics(self):
"""Recopila métricas para todos los controles ENS"""
collected_metrics = {}
# Marco Organizativo
collected_metrics['organizational'] = {
'org_1_security_policy': self.collect_policy_metrics(),
'org_2_security_regulations': self.collect_regulation_compliance_metrics(),
'org_3_operational_procedures': self.collect_procedure_compliance_metrics(),
'org_4_authorization_process': self.collect_authorization_metrics()
}
# Marco Operacional - Planificación
collected_metrics['operational_planning'] = {
'op_pl_1_risk_analysis': self.collect_risk_analysis_metrics(),
'op_pl_2_security_architecture': self.collect_architecture_metrics(),
'op_pl_3_component_acquisition': self.collect_acquisition_metrics(),
'op_pl_4_capacity_management': self.collect_capacity_metrics(),
'op_pl_5_certified_components': self.collect_certification_metrics()
}
# Marco Operacional - Control de Acceso
collected_metrics['access_control'] = {
'op_acc_1_identification': self.collect_identification_metrics(),
'op_acc_2_access_requirements': self.collect_access_requirement_metrics(),
'op_acc_3_segregation_duties': self.collect_segregation_metrics(),
'op_acc_4_access_management': self.collect_access_mgmt_metrics(),
'op_acc_5_authentication': self.collect_authentication_metrics(),
'op_acc_6_local_access': self.collect_local_access_metrics(),
'op_acc_7_remote_access': self.collect_remote_access_metrics()
}
# Marco Operacional - Explotación
collected_metrics['exploitation'] = {
'op_exp_1_asset_inventory': self.collect_asset_inventory_metrics(),
'op_exp_2_security_configuration': self.collect_config_metrics(),
'op_exp_3_configuration_management': self.collect_config_mgmt_metrics(),
'op_exp_4_maintenance': self.collect_maintenance_metrics(),
'op_exp_5_change_management': self.collect_change_metrics(),
'op_exp_6_malware_protection': self.collect_malware_metrics(),
'op_exp_7_incident_management': self.collect_incident_metrics(),
'op_exp_8_user_activity_logging': self.collect_user_logging_metrics(),
'op_exp_9_incident_logging': self.collect_incident_logging_metrics(),
'op_exp_10_log_protection': self.collect_log_protection_metrics(),
'op_exp_11_crypto_key_protection': self.collect_crypto_metrics()
}
return collected_metrics
def collect_policy_metrics(self):
"""Métricas específicas para ORG.1 - Política de Seguridad"""
return {
'policy_approval_date': self.get_policy_approval_date(),
'policy_review_currency': self.calculate_policy_age_days(),
'policy_awareness_training_completion': self.get_training_completion_rate('security_policy'),
'policy_exceptions_count': self.count_active_policy_exceptions(),
'policy_compliance_score': self.calculate_policy_compliance_score()
}
def collect_risk_analysis_metrics(self):
"""Métricas específicas para OP.PL.1 - Análisis de Riesgos"""
return {
'risk_assessment_currency': self.get_days_since_last_risk_assessment(),
'high_risk_count': self.count_risks_by_level('HIGH'),
'critical_risk_count': self.count_risks_by_level('CRITICAL'),
'risk_treatment_completion_rate': self.calculate_risk_treatment_rate(),
'residual_risk_acceptance_rate': self.calculate_residual_risk_acceptance()
}
def collect_access_mgmt_metrics(self):
"""Métricas específicas para OP.ACC.4 - Gestión de Derechos de Acceso"""
ad_client = self.data_sources['active_directory']
return {
'user_account_count': ad_client.get_total_user_count(),
'privileged_account_count': ad_client.get_privileged_account_count(),
'inactive_account_count': ad_client.get_inactive_accounts_count(days=90),
'access_review_completion_rate': self.calculate_access_review_completion(),
'orphaned_account_count': ad_client.get_orphaned_accounts_count(),
'password_policy_compliance': ad_client.get_password_policy_compliance_rate(),
'failed_authentication_rate': ad_client.get_failed_authentication_rate('24h')
}
12. Reportes regulatorios automatizados
12.1 Generación automática de informes ENS
class ENSRegulatoryReporting:
def __init__(self):
self.metrics_collector = ENSAutomatedMetricsCollector()
self.template_engine = ReportTemplateEngine()
self.compliance_calculator = ENSComplianceCalculator()
def generate_monthly_ens_report(self, month, year):
"""Genera informe mensual de cumplimiento ENS"""
# Recopilar métricas del período
period_metrics = self.metrics_collector.collect_period_metrics(month, year)
# Calcular índices de cumplimiento
compliance_indices = self.compliance_calculator.calculate_all_indices(period_metrics)
# Identificar desviaciones y no conformidades
deviations = self.identify_compliance_deviations(period_metrics)
report_data = {
'report_info': {
'period': f'{month}/{year}',
'generation_date': datetime.now().isoformat(),
'organization': 'Legit Health',
'system': 'Legit Health Plus (Clase IIa)',
'ens_category': 'MEDIO'
},
'executive_summary': {
'overall_compliance': compliance_indices['overall'],
'framework_compliance': {
'organizational': compliance_indices['organizational'],
'operational': compliance_indices['operational'],
'protection_measures': compliance_indices['protection_measures']
},
'critical_findings': len([d for d in deviations if d['severity'] == 'CRITICAL']),
'improvement_actions': self.generate_improvement_actions(deviations)
},
'detailed_metrics': period_metrics,
'compliance_analysis': compliance_indices,
'deviations': deviations,
'trend_analysis': self.generate_trend_analysis(month, year),
'recommendations': self.generate_monthly_recommendations(compliance_indices, deviations)
}
# Generar informe formateado
formatted_report = self.template_engine.generate_ens_compliance_report(report_data)
return formatted_report
def generate_improvement_actions(self, deviations):
"""Genera acciones de mejora basadas en desviaciones"""
actions = []
for deviation in deviations:
if deviation['severity'] in ['CRITICAL', 'HIGH']:
action = {
'priority': deviation['severity'],
'control': deviation['control_id'],
'description': deviation['description'],
'recommended_action': self.get_remediation_action(deviation),
'target_date': self.calculate_target_date(deviation['severity']),
'responsible': self.assign_responsible_person(deviation['control_id'])
}
actions.append(action)
return sorted(actions, key=lambda x: x['priority'], reverse=True)
6. Reporting regulatorio automatizado
6.1 Generación automática de informes
class RegulatoryReportGenerator:
def __init__(self):
self.metrics_db = MetricsDatabase()
self.report_templates = self.load_regulatory_templates()
def generate_mdr_compliance_report(self, period='monthly'):
"""Genera reporte de cumplimiento MDR"""
metrics = self.metrics_db.get_mdr_metrics(period)
report = {
'report_period': period,
'device_identification': {
'name': 'Legit Health Plus',
'class': 'IIa',
'udi_di': 'LH-DER-AI-2024-001'
},
'safety_performance': {
'system_availability': metrics['availability_percentage'],
'diagnostic_accuracy': metrics['ai_accuracy_average'],
'incident_count': metrics['safety_incidents'],
'user_reported_issues': metrics['user_complaints']
},
'cybersecurity_status': {
'security_incidents': metrics['security_incidents'],
'vulnerability_assessments': metrics['vulnerability_scans'],
'access_control_effectiveness': metrics['access_control_score'],
'data_protection_compliance': metrics['data_protection_score']
},
'post_market_surveillance': {
'clinical_performance_data': metrics['clinical_outcomes'],
'user_feedback_analysis': metrics['user_feedback_summary'],
'device_modifications': metrics['device_changes']
}
}
return self.format_mdr_report(report)
7. Integración con sistemas de calidad
7.1 Integración con QMS
QMS_Integration:
Risk_Management:
- "Automated risk metric updates to R-TF-013-002"
- "Real-time risk assessment based on system metrics"
- "Integration with threat model monitoring"
CAPA_System:
- "Automatic CAPA generation for metric threshold violations"
- "Root cause analysis automation"
- "Effectiveness monitoring of implemented CAPAs"
Document_Control:
- "Version control integration for metric baselines"
- "Change control workflow for threshold modifications"
- "Approval process for new metrics implementation"
8. Métricas de ciberseguridad avanzadas
8.1 Behavioral analytics
class CybersecurityBehavioralMetrics:
def __init__(self):
self.baseline_models = self.load_behavioral_baselines()
self.anomaly_detectors = self.load_anomaly_detection_models()
def calculate_user_risk_score(self, user_id, time_window='24h'):
"""Calcula puntuación de riesgo basada en comportamiento"""
user_activity = self.get_user_activity(user_id, time_window)
risk_factors = {
'access_pattern_anomaly': self.detect_access_pattern_anomaly(user_activity),
'data_volume_anomaly': self.detect_data_access_volume_anomaly(user_activity),
'time_pattern_anomaly': self.detect_time_pattern_anomaly(user_activity),
'location_anomaly': self.detect_location_anomaly(user_activity),
'device_anomaly': self.detect_device_anomaly(user_activity)
}
# Calculate weighted risk score
weights = {'access_pattern_anomaly': 0.3, 'data_volume_anomaly': 0.25,
'time_pattern_anomaly': 0.2, 'location_anomaly': 0.15, 'device_anomaly': 0.1}
risk_score = sum(risk_factors[factor] * weights[factor] for factor in risk_factors)
return {
'user_id': user_id,
'risk_score': risk_score,
'risk_level': self.categorize_risk_level(risk_score),
'contributing_factors': [f for f, score in risk_factors.items() if score > 0.7],
'recommended_actions': self.get_risk_mitigation_actions(risk_score)
}
13. Procedimientos de revisión de métricas
13.1 Reuniones periódicas de revisión
Metrics_Review_Schedule:
Daily_Reviews:
Participants: "CTO, Operations Manager, Security Team Lead"
Duration: "15 minutes"
Focus: "Critical alerts, system health, immediate actions"
Deliverables: "Daily security status report"
Weekly_Reviews:
Participants: "Executive team, Department heads"
Duration: "1 hour"
Focus: "Trend analysis, performance against targets, resource needs"
Deliverables: "Weekly metrics dashboard, action items"
Monthly_Reviews:
Participants: "Board members, C-suite, Audit committee"
Duration: "2 hours"
Focus: "Compliance status, strategic alignment, budget implications"
Deliverables: "Monthly compliance report, budget recommendations"
Quarterly_Reviews:
Participants: "External auditors, Regulatory consultants"
Duration: "Half day"
Focus: "ENS certification readiness, regulatory compliance"
Deliverables: "Quarterly assurance report, certification roadmap"
13.2 Procedimiento de escalado de métricas
class MetricsEscalationManager:
def __init__(self):
self.escalation_matrix = {
'CRITICAL': {
'immediate': ['CTO', 'CEO'],
'within_1h': ['Board Chair', 'Audit Committee'],
'within_4h': ['External Auditor', 'Legal Counsel']
},
'HIGH': {
'immediate': ['CTO'],
'within_2h': ['CEO'],
'within_8h': ['Department Heads']
},
'MEDIUM': {
'within_4h': ['Operations Manager'],
'within_24h': ['CTO']
}
}
def process_metric_threshold_breach(self, metric_data):
"""Procesa violación de umbral de métrica"""
severity = self.determine_breach_severity(metric_data)
escalation_plan = self.escalation_matrix.get(severity, {})
notifications_sent = []
for timeframe, recipients in escalation_plan.items():
notification = {
'severity': severity,
'metric': metric_data['metric_name'],
'current_value': metric_data['current_value'],
'threshold': metric_data['threshold'],
'breach_time': metric_data['breach_time'],
'recipients': recipients,
'send_time': self.calculate_send_time(timeframe),
'message': self.generate_escalation_message(metric_data, severity)
}
notifications_sent.append(notification)
self.schedule_notification(notification)
return notifications_sent
14. KPI tracking para efectividad del cumplimiento ENS
14.1 KPIs estratégicos de cumplimiento
ENS_Strategic_KPIs:
Compliance_Effectiveness:
- "ENS Certification Readiness Score: Target 95%"
- "Control Implementation Completeness: Target 100%"
- "Control Effectiveness Rating: Target 90%"
- "Audit Finding Reduction Rate: Target 20% YoY"
Security_Performance:
- "Security Incident Frequency: Target <2/month"
- "Mean Time to Detect (MTTD): Target <1 hour"
- "Mean Time to Respond (MTTR): Target <4 hours"
- "Security Awareness Training Completion: Target 100%"
Operational_Excellence:
- "System Availability: Target 99.9%"
- "Backup Success Rate: Target 100%"
- "Change Success Rate: Target 98%"
- "Vulnerability Remediation Time: Target <7 days"
Regulatory_Compliance:
- "Regulatory Notification Timeliness: Target 100%"
- "Documentation Currency: Target 100%"
- "Risk Assessment Frequency: Target Quarterly"
- "Control Testing Coverage: Target 100%"
14.2 Sistema de seguimiento de KPI
class ENSKPITracker:
def __init__(self):
self.kpi_definitions = self.load_kpi_definitions()
self.target_values = self.load_target_values()
self.historical_data = KPIHistoricalDatabase()
def calculate_kpi_performance(self, time_period='current_quarter'):
"""Calcula rendimiento de KPIs contra objetivos"""
kpi_results = {}
for kpi_category, kpis in self.kpi_definitions.items():
category_results = {}
for kpi_name, kpi_config in kpis.items():
current_value = self.collect_kpi_data(kpi_name, time_period)
target_value = self.target_values[kpi_name]
performance = {
'current_value': current_value,
'target_value': target_value,
'achievement_rate': self.calculate_achievement_rate(current_value, target_value, kpi_config),
'trend': self.calculate_trend(kpi_name, time_period),
'status': self.determine_kpi_status(current_value, target_value, kpi_config),
'variance': current_value - target_value if kpi_config.get('higher_is_better', True) else target_value - current_value
}
category_results[kpi_name] = performance
kpi_results[kpi_category] = category_results
return kpi_results
def generate_kpi_dashboard(self, kpi_results):
"""Genera dashboard de KPIs"""
dashboard_data = {
'summary': {
'total_kpis': sum(len(category) for category in kpi_results.values()),
'kpis_on_target': sum(1 for category in kpi_results.values()
for kpi in category.values()
if kpi['status'] == 'ON_TARGET'),
'kpis_at_risk': sum(1 for category in kpi_results.values()
for kpi in category.values()
if kpi['status'] == 'AT_RISK'),
'kpis_off_target': sum(1 for category in kpi_results.values()
for kpi in category.values()
if kpi['status'] == 'OFF_TARGET')
},
'category_performance': {},
'trending_kpis': self.identify_trending_kpis(kpi_results),
'action_required': self.identify_action_required_kpis(kpi_results)
}
for category, kpis in kpi_results.items():
avg_achievement = sum(kpi['achievement_rate'] for kpi in kpis.values()) / len(kpis)
dashboard_data['category_performance'][category] = {
'average_achievement': avg_achievement,
'status': self.determine_category_status(avg_achievement),
'kpi_count': len(kpis)
}
return dashboard_data
15. Referencias cruzadas
- OP.MON.1: Integración con sistema de detección de intrusiones
- OP.CONT.1: Métricas de impacto para continuidad del negocio
- R-TF-013-002: Alimentación de datos de riesgo en tiempo real
- GP-013: Marco general de ciberseguridad
- T-024-006: Procedimiento de monitorización y métricas
- OP.EXT.3: Métricas de cadena de suministro integradas
- ORG.1: Métricas de efectividad de política de seguridad
- OP.PL.1: Métricas de análisis de riesgos actualizadas
Signature meaning
The signatures for the approval process of this document can be found in the verified commits at the repository for the QMS. As a reference, the team members who are expected to participate in this document and their roles in the approval process, as defined in Annex I Responsibility Matrix
of the GP-001
, are:
- Author: Team members involved
- Reviewer: JD-003, JD-004
- Approver: JD-001