# Distribuite Denial of Service Attacks

DDoS é um ataque cibernético onde múltiplos sistemas comprometidos são utilizados para direcionar tráfego malicioso para um único alvo, sobrecarregando seus recursos e tornando-o indisponível para usuários legítimos.

### **Princípio de Funcionamento**

```
Botnet Coordenada → Tráfego Massivo → Esgotamento de Recursos → Indisponibilidade
        ↓                  ↓                  ↓                    ↓
   Múltiplos devices   Ataque sincronizado   Servidor/Network    Serviço fica
   comprometidos        para o alvo           não consegue        inacessível
                                        processar requests
```

### **Características do DDoS**

* **Ataque distribuído** de múltiplas origens
* **Dificuldade de bloqueio** devido a diversidade de IPs
* **Uso de dispositivos legítimos comprometidos** (botnets)
* **Objetivo principal**: indisponibilidade do serviço
* **Frequentemente usado** como cortina de fumaça para outros ataques

### **Componentes de um Ataque DDoS**

1. **Handler** - Sistema que controla os agents
2. **Agent** - Dispositivo comprometido (zumbi)
3. **Botnet** - Rede de agents coordenados
4. **Target** - Alvo do ataque
5. **Command & Control (C\&C)** - Servidor de comando

***

## **⚡ Tipos e Vetores de Ataque**

### **Classificação por Camada OSI**

#### **Ataques de Camada 3/4 (Network/Transport)**

* **UDP Flood**: envio massivo de pacotes UDP
* **ICMP Flood**: envio massivo de pacotes ICMP
* **DNS Amplification**: ataque com amplificação DNS
* **SYN Flood**: explora handshake TCP
* **ACK Flood**: envio massivo de ACK packets
* **Fragmentation Attacks**: envio de pacotes fragmentados

#### **Ataques de Camada 7 (Application)**

* **HTTP Flood**: requests HTTP legítimos em massa
* **Slowloris**: mantém muitas conexões abertas
* **RUDY**: slow POST attack
* **SSL Renegotiation**: força renegociação SSL/TLS
* **THC-SSL-DOS**: explora handshake SSL

### **Vetores de Ataque Específicos**

* DNS Amplification
* HTTP/2 Rapid Reset
* Memcached Amplification

***

## **⚔️ Mecanismos de Ataque**

Um ataque DDoS costuma seguir uma sequência de preparação da botnet, comando e controle, execução sincronizada contra o alvo, escalação da pressão e término/ocultação.

***

## **🔍 Detecção e Monitoramento**

### **Sinais de Alerta de DDoS**

* **Indicadores de Performance**
  * Aumento súbito de utilização de CPU/RAM
  * Queda drástica na velocidade de rede
  * Timeouts e lentidão incomum
  * Aumento exponencial de logs
* **Indicadores de Rede**
  * Picos anormais de tráfego
  * Padrões de tráfego incomuns
  * Aumento de pacotes fragmentados
  * Muitas conexões SYN sem ACK
* **Indicadores de Aplicação**
  * Erros 5xx em massa
  * Aumento de requests para recursos estáticos
  * Padrões de user-agent suspeitos
  * Geografia de tráfego anormal

### **Sistema de Detecção Automatizada**

```python
#!/usr/bin/env python3
# ddos_detector.py

import time
import statistics
from collections import deque, defaultdict
import logging
import psutil
import requests

class DDoSDetector:
    """
    Sistema de detecção de ataques DDoS em tempo real
    """
    
    def __init__(self, config):
        self.config = config
        self.logger = logging.getLogger('ddos_detector')
        
        # Estatísticas de baseline
        self.baseline_metrics = {
            'requests_per_second': 0,
            'bandwidth_usage': 0,
            'connection_count': 0
        }
        
        # Janelas de tempo para análise
        self.request_window = deque(maxlen=300)  # 5 minutos
        self.bandwidth_window = deque(maxlen=60)  # 1 minuto
        self.connection_window = deque(maxlen=300)
        
        # Thresholds configuráveis
        self.thresholds = {
            'request_spike': 5.0,  # 5x aumento
            'bandwidth_spike': 3.0, # 3x aumento
            'connection_spike': 10.0 # 10x aumento
        }
        
        self.initialize_baseline()
    
    def initialize_baseline(self):
        """Estabelecer baseline normal do sistema"""
        # Coletar métricas por 5 minutos
        baseline_samples = []
        for _ in range(300):
            metrics = self.collect_metrics()
            baseline_samples.append(metrics)
            time.sleep(1)
        
        # Calcular médias
        self.baseline_metrics = {
            'requests_per_second': statistics.mean(
                [s['requests_per_second'] for s in baseline_samples]
            ),
            'bandwidth_usage': statistics.mean(
                [s['bandwidth_usage'] for s in baseline_samples]
            ),
            'connection_count': statistics.mean(
                [s['connection_count'] for s in baseline_samples]
            )
        }
        
        self.logger.info(f"Baseline estabelecida: {self.baseline_metrics}")
    
    def collect_metrics(self):
        """Coletar métricas do sistema"""
        # Requests por segundo (simulado - integrar com web server logs)
        requests_ps = self.get_requests_per_second()
        
        # Uso de banda (MB/s)
        bandwidth = psutil.net_io_counters().bytes_sent + psutil.net_io_counters().bytes_recv
        bandwidth_mbps = bandwidth / 1024 / 1024
        
        # Contagem de conexões
        connections = len(psutil.net_connections())
        
        return {
            'requests_per_second': requests_ps,
            'bandwidth_usage': bandwidth_mbps,
            'connection_count': connections,
            'timestamp': time.time()
        }
    
    def get_requests_per_second(self):
        """Obter requests por segundo do web server"""
        # Integrar com access logs do nginx/apache
        # Por enquanto, retornar valor simulado
        return random.randint(10, 100)
    
    def analyze_traffic_patterns(self, current_metrics):
        """Analisar padrões de tráfego para detecção de DDoS"""
        alerts = []
        
        # Verificar spikes em requests
        current_rps = current_metrics['requests_per_second']
        baseline_rps = self.baseline_metrics['requests_per_second']
        
        if baseline_rps > 0 and current_rps / baseline_rps > self.thresholds['request_spike']:
            alerts.append({
                'type': 'REQUEST_SPIKE',
                'severity': 'HIGH',
                'current': current_rps,
                'baseline': baseline_rps,
                'ratio': current_rps / baseline_rps
            })
        
        # Verificar spikes de banda
        current_bw = current_metrics['bandwidth_usage']
        baseline_bw = self.baseline_metrics['bandwidth_usage']
        
        if baseline_bw > 0 and current_bw / baseline_bw > self.thresholds['bandwidth_spike']:
            alerts.append({
                'type': 'BANDWIDTH_SPIKE',
                'severity': 'HIGH',
                'current': current_bw,
                'baseline': baseline_bw,
                'ratio': current_bw / baseline_bw
            })
        
        # Verificar spikes de conexões
        current_conn = current_metrics['connection_count']
        baseline_conn = self.baseline_metrics['connection_count']
        
        if baseline_conn > 0 and current_conn / baseline_conn > self.thresholds['connection_spike']:
            alerts.append({
                'type': 'CONNECTION_SPIKE',
                'severity': 'HIGH',
                'current': current_conn,
                'baseline': baseline_conn,
                'ratio': current_conn / baseline_conn
            })
        
        return alerts
    
    def detect_attack_vectors(self, network_data):
        """Detectar vetores de ataque específicos"""
        vectors_detected = []
        
        # Detectar SYN Flood
        if self.detect_syn_flood(network_data):
            vectors_detected.append('SYN_FLOOD')
        
        # Detectar UDP Flood
        if self.detect_udp_flood(network_data):
            vectors_detected.append('UDP_FLOOD')
        
        # Detectar HTTP Flood
        if self.detect_http_flood(network_data):
            vectors_detected.append('HTTP_FLOOD')
        
        return vectors_detected
    
    def detect_syn_flood(self, network_data):
        """Detectar SYN Flood baseado em conexões half-open"""
        syn_count = network_data.get('syn_packets', 0)
        syn_ack_count = network_data.get('syn_ack_packets', 0)
        
        # Se muitas SYNs sem correspondentes SYN-ACKs
        if syn_count > 1000 and syn_ack_count / syn_count < 0.1:
            return True
        return False
    
    def detect_udp_flood(self, network_data):
        """Detectar UDP Flood"""
        udp_packets = network_data.get('udp_packets', 0)
        total_packets = network_data.get('total_packets', 1)
        
        # Se mais de 70% do tráfego é UDP
        if udp_packets / total_packets > 0.7:
            return True
        return False
    
    def detect_http_flood(self, network_data):
        """Detectar HTTP Flood"""
        http_requests = network_data.get('http_requests', 0)
        unique_ips = network_data.get('unique_ips', 1)
        
        # Muitos requests por IP único
        if http_requests / unique_ips > 1000:
            return True
        return False
    
    def start_monitoring(self):
        """Iniciar monitoramento contínuo"""
        self.logger.info("Iniciando monitoramento DDoS...")
        
        while True:
            try:
                # Coletar métricas atuais
                current_metrics = self.collect_metrics()
                
                # Analisar padrões
                alerts = self.analyze_traffic_patterns(current_metrics)
                
                # Processar alertas
                for alert in alerts:
                    self.handle_alert(alert)
                
                # Atualizar baseline dinamicamente (se não em ataque)
                if not alerts:
                    self.update_baseline(current_metrics)
                
                time.sleep(1)  # Verificar a cada segundo
                
            except Exception as e:
                self.logger.error(f"Erro no monitoramento: {e}")
                time.sleep(5)
    
    def handle_alert(self, alert):
        """Processar alerta de DDoS"""
        self.logger.warning(f"ALERTA DDoS: {alert}")
        
        # Acionar ações de mitigação automática
        if alert['severity'] == 'HIGH':
            self.trigger_mitigation(alert)
        
        # Notificar equipe de segurança
        self.notify_security_team(alert)
    
    def trigger_mitigation(self, alert):
        """Acionar medidas de mitigação"""
        self.logger.info(f"Acionando mitigação para: {alert['type']}")
        
        # Implementar ações baseadas no tipo de alerta
        if alert['type'] == 'REQUEST_SPIKE':
            self.enable_rate_limiting()
        
        elif alert['type'] == 'BANDWIDTH_SPIKE':
            self.enable_traffic_shaping()
        
        elif alert['type'] == 'CONNECTION_SPIKE':
            self.enable_connection_limits()

# Configuração e uso
config = {
    'thresholds': {
        'request_spike': 5.0,
        'bandwidth_spike': 3.0,
        'connection_spike': 10.0
    }
}

detector = DDoSDetector(config)
detector.start_monitoring()
```

### **Monitoramento com Elastic Stack**

```yaml
# elasticsearch/ddos_detection.json
{
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "requests_per_second": {
              "gte": 1000
            }
          }
        },
        {
          "range": {
            "response_time": {
              "gte": 5000
            }
          }
        }
      ]
    }
  },
  "aggs": {
    "suspicious_ips": {
      "terms": {
        "field": "client_ip.keyword",
        "min_doc_count": 1000
      }
    }
  }
}
```

***

## **🛡️ Mitigação e Defesa**

### **Estratégias de Defesa em Camadas**

#### **1. Proteção de Infraestrutura**

**Configuração de Rede**

```bash
# iptables rules para mitigação DDoS
iptables -A INPUT -p tcp --syn -m limit --limit 1/s --limit-burst 3 -j ACCEPT
iptables -A INPUT -p tcp --syn -j DROP

# Limitar conexões por IP
iptables -A INPUT -p tcp --dport 80 -m connlimit --connlimit-above 20 -j DROP

# Proteção contra SYN Flood
sysctl -w net.ipv4.tcp_syncookies=1
sysctl -w net.ipv4.tcp_max_syn_backlog=2048
sysctl -w net.ipv4.tcp_synack_retries=2
```

**Configuração Nginx**

```nginx
# /etc/nginx/nginx.conf
http {
    # Rate limiting
    limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
    limit_req_zone $binary_remote_addr zone=general:10m rate=100r/s;
    
    # Connection limiting
    limit_conn_zone $binary_remote_addr zone=addr:10m;
    
    # Timeouts reduzidos
    client_body_timeout 10;
    client_header_timeout 10;
    keepalive_timeout 30;
    send_timeout 10;
    
    # Buffer limits
    client_body_buffer_size 128k;
    client_header_buffer_size 1k;
    client_max_body_size 8m;
    large_client_header_buffers 2 1k;
    
    server {
        listen 80;
        
        # Rate limiting por location
        location /api/ {
            limit_req zone=api burst=20 nodelay;
            limit_conn addr 10;
            proxy_pass http://backend;
        }
        
        location / {
            limit_req zone=general burst=50 nodelay;
            limit_conn addr 20;
        }
    }
}
```

#### **2. Web Application Firewall (WAF) Rules**

```python
# waf_rules.py
class DDoSWAFRules:
    """
    Regras de WAF para mitigação de DDoS
    """
    
    def __init__(self):
        self.rules = self.load_rules()
    
    def load_rules(self):
        """Carregar regras de mitigação"""
        return [
            {
                'name': 'REQUEST_RATE_LIMIT',
                'condition': self.check_request_rate,
                'action': 'BLOCK_IP'
            },
            {
                'name': 'USER_AGENT_ANOMALY',
                'condition': self.check_user_agent,
                'action': 'CHALLENGE'
            },
            {
                'name': 'GEOGRAPHIC_ANOMALY',
                'condition': self.check_geographic,
                'action': 'BLOCK_REGION'
            }
        ]
    
    def check_request_rate(self, request, ip_stats):
        """Verificar taxa de requests por IP"""
        requests_last_minute = ip_stats.get('requests_60s', 0)
        return requests_last_minute > 1000  # Mais de 1000 req/minuto
    
    def check_user_agent(self, request):
        """Detectar user agents suspeitos"""
        user_agent = request.headers.get('User-Agent', '')
        suspicious_patterns = [
            'python', 'curl', 'wget', 'mass', 'scan'
        ]
        return any(pattern in user_agent.lower() for pattern in suspicious_patterns)
    
    def check_geographic(self, request, ip_info):
        """Verificar anomalias geográficas"""
        # Se tráfego vem de região não usual
        usual_regions = ['BR', 'US', 'DE']
        return ip_info.get('country') not in usual_regions
```

### **Sistema de Mitigação Automatizada**

```python
#!/usr/bin/env python3
# ddos_mitigator.py

import redis
import requests
from datetime import datetime, timedelta

class DDoSMitigator:
    """
    Sistema automatizado de mitigação de DDoS
    """
    
    def __init__(self):
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        self.mitigation_rules = self.load_mitigation_rules()
        
    def load_mitigation_rules(self):
        """Carregar regras de mitigação"""
        return {
            'LOW': {
                'actions': ['RATE_LIMIT', 'CAPTCHA'],
                'threshold': 1000  # requests/minuto
            },
            'MEDIUM': {
                'actions': ['BLOCK_IP', 'ENABLE_WAF'],
                'threshold': 5000
            },
            'HIGH': {
                'actions': ['BLOCK_SUBNET', 'ENABLE_CDN'],
                'threshold': 10000
            }
        }
    
    def mitigate_attack(self, attack_data):
        """Executar mitigação baseada na severidade do ataque"""
        severity = self.assess_severity(attack_data)
        mitigation_plan = self.mitigation_rules.get(severity, {})
        
        self.log_mitigation(attack_data, severity)
        
        # Executar ações de mitigação
        for action in mitigation_plan.get('actions', []):
            self.execute_mitigation_action(action, attack_data)
        
        return severity
    
    def assess_severity(self, attack_data):
        """Avaliar severidade do ataque"""
        request_rate = attack_data.get('requests_per_minute', 0)
        bandwidth_usage = attack_data.get('bandwidth_mbps', 0)
        
        if request_rate > 10000 or bandwidth_usage > 1000:
            return 'HIGH'
        elif request_rate > 5000 or bandwidth_usage > 500:
            return 'MEDIUM'
        elif request_rate > 1000 or bandwidth_usage > 100:
            return 'LOW'
        else:
            return 'NONE'
    
    def execute_mitigation_action(self, action, attack_data):
        """Executar ação de mitigação específica"""
        if action == 'RATE_LIMIT':
            self.enable_rate_limiting(attack_data)
        elif action == 'BLOCK_IP':
            self.block_malicious_ips(attack_data)
        elif action == 'ENABLE_WAF':
            self.enable_waf_rules()
        elif action == 'BLOCK_SUBNET':
            self.block_subnets(attack_data)
        elif action == 'ENABLE_CDN':
            self.enable_cdn_protection()
    
    def enable_rate_limiting(self, attack_data):
        """Ativar rate limiting"""
        # Configurar nginx/iptables para rate limiting
        malicious_ips = attack_data.get('malicious_ips', [])
        
        for ip in malicious_ips:
            # Adicionar IP à lista de rate limiting
            self.redis_client.sadd('rate_limited_ips', ip)
    
    def block_malicious_ips(self, attack_data):
        """Bloquear IPs maliciosos"""
        malicious_ips = attack_data.get('malicious_ips', [])
        
        for ip in malicious_ips:
            # Adicionar IP à blacklist
            self.redis_client.sadd('blocked_ips', ip)
            
            # Aplicar bloqueio via iptables
            self.apply_iptables_block(ip)
    
    def apply_iptables_block(self, ip):
        """Aplicar bloqueio via iptables"""
        import subprocess
        try:
            subprocess.run([
                'iptables', '-A', 'INPUT', '-s', ip, '-j', 'DROP'
            ], check=True)
        except subprocess.CalledProcessError as e:
            print(f"Erro ao bloquear IP {ip}: {e}")
    
    def enable_waf_rules(self):
        """Ativar regras de WAF específicas para DDoS"""
        # Configurar regras no CloudFlare, AWS WAF, etc.
        waf_rules = {
            'rate_based_rules': True,
            'ip_reputation_lists': True,
            'geo_blocking': True
        }
        
        # Implementar ativação via API
        self.configure_external_waf(waf_rules)
    
    def block_subnets(self, attack_data):
        """Bloquear subnets inteiras se necessário"""
        suspicious_subnets = self.identify_suspicious_subnets(attack_data)
        
        for subnet in suspicious_subnets:
            self.redis_client.sadd('blocked_subnets', subnet)
    
    def enable_cdn_protection(self):
        """Ativar proteção de CDN"""
        # Configurar CDN para absorver tráfego
        cdn_config = {
            'cache_everything': True,
            'security_level': 'high',
            'challenge_ttl': 86400
        }
        
        self.configure_cdn(cdn_config)
    
    def log_mitigation(self, attack_data, severity):
        """Registrar ações de mitigação"""
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'severity': severity,
            'attack_data': attack_data,
            'mitigation_actions': self.mitigation_rules[severity]['actions']
        }
        
        self.redis_client.lpush('mitigation_logs', str(log_entry))

# Uso do mitigador
mitigator = DDoSMitigator()

# Quando um ataque é detectado
attack_data = {
    'requests_per_minute': 12000,
    'bandwidth_mbps': 1500,
    'malicious_ips': ['192.168.1.100', '10.0.0.50']
}

severity = mitigator.mitigate_attack(attack_data)
print(f"Ataque mitigado com severidade: {severity}")
```

### **Proteção com Cloud Services**

#### **1. AWS Shield & WAF**

```python
import boto3

class AWSDDoSProtection:
    """Configuração de proteção DDoS na AWS"""
    
    def __init__(self):
        self.waf_client = boto3.client('wafv2')
        self.shield_client = boto3.client('shield')
    
    def create_rate_based_rule(self, rule_name, rate_limit):
        """Criar regra baseada em taxa no AWS WAF"""
        response = self.waf_client.create_rule(
            Name=rule_name,
            MetricName=rule_name,
            RateKey='IP',
            RateLimit=rate_limit
        )
        return response
    
    def enable_shield_advanced(self):
        """Ativar AWS Shield Advanced"""
        # Shield Advanced fornece proteção DDoS adicional
        try:
            response = self.shield_client.create_subscription()
            return response
        except self.shield_client.exceptions.ResourceAlreadyExistsException:
            print("Shield Advanced já está ativado")
    
    def create_web_acl(self, acl_name, rules):
        """Criar Web ACL para proteção"""
        response = self.waf_client.create_web_acl(
            Name=acl_name,
            MetricName=acl_name,
            DefaultAction={'Allow': {}},
            Rules=rules,
            VisibilityConfig={
                'SampledRequestsEnabled': True,
                'CloudWatchMetricsEnabled': True,
                'MetricName': acl_name
            }
        )
        return response
```

#### **2. CloudFlare DDoS Protection**

```python
import requests

class CloudFlareProtection:
    """Configuração de proteção DDoS no CloudFlare"""
    
    def __init__(self, api_key, email):
        self.api_key = api_key
        self.email = email
        self.base_url = "https://api.cloudflare.com/client/v4"
    
    def set_security_level(self, zone_id, level):
        """Configurar nível de segurança"""
        # levels: essentially_off, low, medium, high, under_attack
        url = f"{self.base_url}/zones/{zone_id}/settings/security_level"
        data = {'value': level}
        
        response = requests.patch(
            url,
            json=data,
            headers={
                'X-Auth-Email': self.email,
                'X-Auth-Key': self.api_key,
                'Content-Type': 'application/json'
            }
        )
        return response.json()
    
    def enable_under_attack_mode(self, zone_id, enable=True):
        """Ativar modo Under Attack"""
        url = f"{self.base_url}/zones/{zone_id}/settings/security_level"
        level = 'under_attack' if enable else 'high'
        
        return self.set_security_level(zone_id, level)
    
    def create_firewall_rule(self, zone_id, expression, action):
        """Criar regra de firewall"""
        url = f"{self.base_url}/zones/{zone_id}/firewall/rules"
        data = {
            'filter': {
                'expression': expression,
                'paused': False
            },
            'action': action,
            'description': 'DDoS Protection Rule'
        }
        
        response = requests.post(
            url,
            json={'rules': [data]},
            headers={
                'X-Auth-Email': self.email,
                'X-Auth-Key': self.api_key,
                'Content-Type': 'application/json'
            }
        )
        return response.json()
```

***

## **🚨 Resposta a Incidentes**

{% stepper %}
{% step %}

### **Phase 1 Detection**

* Monitor traffic patterns
* Analyze anomalies
* Confirm attack
* Assess impact
  {% endstep %}

{% step %}

### **Phase 2 Containment**

* Activate incident response team
* Enable mitigation measures
* Communicate with stakeholders
* Preserve evidence
  {% endstep %}

{% step %}

### **Phase 3 Eradication**

* Identify attack source
* Block malicious traffic
* Update security measures
* Coordinate with ISPs
  {% endstep %}

{% step %}

### **Phase 4 Recovery**

* Gradually restore services
* Monitor for secondary attacks
* Conduct post-mortem
* Update incident response plan
  {% endstep %}
  {% endstepper %}

### **Checklist de Resposta Rápida**

```python
# incident_response.py
class DDoSResponseChecklist:
    """
    Checklist para resposta rápida a ataques DDoS
    """
    
    def __init__(self):
        self.checklist = self.load_checklist()
    
    def load_checklist(self):
        """Carregar checklist de resposta"""
        return {
            'IMMEDIATE_ACTIONS': [
                'Confirmar que é um ataque DDoS',
                'Ativar equipe de resposta a incidentes',
                'Notificar provedor de hospedagem/CDN',
                'Iniciar gravação de logs detalhados',
                'Comunicar-se com stakeholders'
            ],
            'MITIGATION_ACTIONS': [
                'Ativar modo de proteção no CDN',
                'Implementar rate limiting agressivo',
                'Bloquear IPs/regiões maliciosas',
                'Habilitar CAPTCHA/challenges',
                'Redirecionar tráfego para scrubbing center'
            ],
            'COMMUNICATION_ACTIONS': [
                'Atualizar página de status',
                'Notificar clientes afetados',
                'Comunicar-se com a imprensa se necessário',
                'Manter equipe interna informada'
            ],
            'POST_INCIDENT_ACTIONS': [
                'Conduzir análise post-mortem',
                'Atualizar planos de mitigação',
                'Rever configurações de segurança',
                'Treinar equipe com lições aprendidas'
            ]
        }
    
    def execute_response(self, attack_severity):
        """Executar resposta baseada na severidade"""
        print(f"Executando resposta para ataque de severidade: {attack_severity}")
        
        for category, actions in self.checklist.items():
            print(f"\n{category}:")
            for action in actions:
                print(f"  [ ] {action}")
        
        # Implementar ações automaticamente baseado na severidade
        if attack_severity == 'HIGH':
            self.execute_automated_mitigation()
```

***

## **🔧 Ferramentas e Testes**

### **Ferramentas de Monitoramento**

```bash
# Comandos úteis para monitoramento
netstat -an | grep :80 | wc -l  # Conexões na porta 80
ss -s  # Estatísticas de sockets
iftop  # Monitoramento de banda em tempo real
nethogs  # Uso de banda por processo

# Monitoramento com tcpdump
tcpdump -i any -n port 80 | head -1000

# Análise de logs em tempo real
tail -f /var/log/nginx/access.log | awk '{print $1}' | sort | uniq -c | sort -nr
```

***

## **📋 Checklists de Segurança**

### **Checklist de Prevenção DDoS**

* [ ] **Preparação e Planejamento**
  * [ ] Plano de resposta a incidentes documentado
  * [ ] Equipe de resposta designada e treinada
  * [ ] Contatos de emergência com provedores
  * [ ] Backup e recovery procedures testados
* [ ] **Proteção de Infraestrutura**
  * [ ] Rate limiting configurado em load balancers
  * [ ] Firewalls com proteção contra SYN flood
  * [ ] Web Application Firewall (WAF) implementado
  * [ ] CDN com proteção DDoS ativada
* [ ] **Monitoramento e Detecção**
  * [ ] Sistema de monitoramento de tráfego em tempo real
  * [ ] Alertas configurados para anomalias
  * [ ] Baseline de tráfego normal estabelecida
  * [ ] Logs centralizados e retidos
* [ ] **Resposta e Mitigação**
  * [ ] Procedimentos de escalação definidos
  * [ ] Contrato com provedor de mitigação DDoS
  * [ ] Scripts de mitigação automatizada preparados
  * [ ] Plano de comunicação para incidentes

### **Checklist de Resposta a Incidentes**

* [ ] **Detecção e Análise**
  * [ ] Confirmar natureza do ataque
  * [ ] Identificar vetor de ataque
  * [ ] Avaliar impacto nos serviços
  * [ ] Determinar severidade
* [ ] **Contenção e Mitigação**
  * [ ] Ativar proteções de CDN/WAF
  * [ ] Implementar rate limiting agressivo
  * [ ] Bloquear IPs/regiões maliciosas
  * [ ] Contatar provedor de mitigação
* [ ] **Comunicação**
  * [ ] Notificar equipe interna
  * [ ] Atualizar página de status
  * [ ] Comunicar-se com clientes afetados
  * [ ] Coordenar com equipe de PR
* [ ] **Recuperação e Análise**
  * [ ] Monitorar retorno à normalidade
  * [ ] Conduzir análise post-mortem
  * [ ] Atualizar planos de resposta
  * [ ] Implementar melhorias identificadas

### **Checklist de Recuperação Pós-Ataque**

* [ ] **Análise Forense**
  * [ ] Coletar e preservar logs
  * [ ] Analisar padrões de ataque
  * [ ] Identificar vulnerabilidades exploradas
  * [ ] Documentar lições aprendidas
* [ ] **Melhorias de Segurança**
  * [ ] Revisar e atualizar regras de WAF
  * [ ] Ajustar thresholds de monitoramento
  * [ ] Implementar proteções adicionais
  * [ ] Atualizar plano de resposta
* [ ] **Comunicação e Relatórios**
  * [ ] Preparar relatório executivo
  * [ ] Comunicar melhorias implementadas
  * [ ] Atualizar documentação de segurança
  * [ ] Realizar treinamento da equipe

***

## **📊 Exemplos de Implementação**

### **Sistema Completo de Proteção DDoS**

```python
# comprehensive_ddos_protection.py
import asyncio
import logging
from datetime import datetime
from typing import Dict, List
import redis
import psutil

class ComprehensiveDDoSProtection:
    """
    Sistema abrangente de proteção contra DDoS
    """
    
    def __init__(self, config: Dict):
        self.config = config
        self.redis_client = redis.Redis(
            host=config['redis_host'],
            port=config['redis_port'],
            db=0
        )
        self.logger = self.setup_logging()
        
        # Sistemas de proteção
        self.detector = DDoSDetector(config)
        self.mitigator = DDoSMitigator()
        self.monitor = TrafficMonitor(config)
        
        # Estatísticas
        self.attack_history = []
        self.mitigation_stats = {
            'attacks_blocked': 0,
            'false_positives': 0,
            'downtime_minutes': 0
        }
    
    def setup_logging(self):
        """Configurar sistema de logging"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('ddos_protection.log'),
                logging.StreamHandler()
            ]
        )
        return logging.getLogger(__name__)
    
    async def start_protection_system(self):
        """Iniciar sistema completo de proteção"""
        self.logger.info("Iniciando sistema de proteção DDoS")
        
        # Iniciar componentes
        tasks = [
            asyncio.create_task(self.monitor.start_monitoring()),
            asyncio.create_task(self.detector.start_detection()),
            asyncio.create_task(self.health_check_loop())
        ]
        
        await asyncio.gather(*tasks)
    
    async def health_check_loop(self):
        """Loop de verificação de saúde do sistema"""
        while True:
            try:
                # Verificar recursos do sistema
                cpu_percent = psutil.cpu_percent()
                memory_percent = psutil.virtual_memory().percent
                
                if cpu_percent > 90 or memory_percent > 90:
                    self.logger.warning(
                        f"Alta utilização de recursos - CPU: {cpu_percent}%, Mem: {memory_percent}%"
                    )
                
                # Verificar conectividade com serviços
                await self.check_service_health()
                
                await asyncio.sleep(60)  # Verificar a cada minuto
                
            except Exception as e:
                self.logger.error(f"Erro no health check: {e}")
                await asyncio.sleep(30)
    
    async def check_service_health(self):
        """Verificar saúde dos serviços de proteção"""
        services = [
            ('Redis', self.redis_client.ping),
            # Adicionar verificações para WAF, CDN, etc.
        ]
        
        for service_name, check_func in services:
            try:
                check_func()
                self.logger.debug(f"Serviço {service_name} está saudável")
            except Exception as e:
                self.logger.error(f"Serviço {service_name} indisponível: {e}")
    
    def handle_detected_attack(self, attack_data: Dict):
        """Manipular ataque detectado"""
        self.logger.warning(f"Ataque DDoS detectado: {attack_data}")
        
        # Registrar ataque
        self.attack_history.append({
            'timestamp': datetime.now(),
            'data': attack_data
        })
        
        # Executar mitigação
        severity = self.mitigator.mitigate_attack(attack_data)
        
        # Atualizar estatísticas
        self.mitigation_stats['attacks_blocked'] += 1
        
        # Notificar equipe
        self.notify_security_team(attack_data, severity)
    
    def notify_security_team(self, attack_data: Dict, severity: str):
        """Notificar equipe de segurança"""
        message = f"""
🚨 ALERTA DDoS - Severidade: {severity} 🚨

Detalhes do Ataque:
- Target: {attack_data.get('target', 'N/A')}
- Requests/min: {attack_data.get('requests_per_minute', 0)}
- Banda: {attack_data.get('bandwidth_mbps', 0)} Mbps
- IPs envolvidos: {len(attack_data.get('malicious_ips', []))}

Ações de Mitigação:
{self.mitigator.mitigation_rules.get(severity, {}).get('actions', [])}

Timestamp: {datetime.now()}
        """
        
        # Enviar notificação (email, Slack, SMS, etc.)
        self.send_notification(message)
    
    def send_notification(self, message: str):
        """Enviar notificação para equipe"""
        # Implementar integração com sistemas de notificação
        # Email, Slack, PagerDuty, SMS, etc.
        print(f"NOTIFICAÇÃO: {message}")
        
        # Exemplo com Slack
        try:
            requests.post(
                self.config['slack_webhook'],
                json={'text': message}
            )
        except Exception as e:
            self.logger.error(f"Erro ao enviar notificação Slack: {e}")

# Configuração do sistema
config = {
    'redis_host': 'localhost',
    'redis_port': 6379,
    'slack_webhook': 'https://hooks.slack.com/services/...',
    'detection_thresholds': {
        'request_spike': 5.0,
        'bandwidth_spike': 3.0
    },
    'monitoring_intervals': {
        'traffic': 1,  # segundos
        'system_health': 60  # segundos
    }
}

# Inicializar e executar sistema
protection_system = ComprehensiveDDoSProtection(config)
asyncio.run(protection_system.start_protection_system())
```

### **Dashboard de Monitoramento**

```python
# monitoring_dashboard.py
from flask import Flask, render_template, jsonify
import psutil
import time

app = Flask(__name__)

class DDoSDashboard:
    """Dashboard para monitoramento DDoS em tempo real"""
    
    def __init__(self):
        self.metrics_history = {
            'requests_per_second': [],
            'bandwidth_usage': [],
            'connection_count': []
        }
    
    def get_current_metrics(self):
        """Obter métricas atuais do sistema"""
        return {
            'requests_per_second': self.get_requests_per_second(),
            'bandwidth_usage': self.get_bandwidth_usage(),
            'connection_count': self.get_connection_count(),
            'cpu_usage': psutil.cpu_percent(),
            'memory_usage': psutil.virtual_memory().percent,
            'timestamp': time.time()
        }
    
    def update_metrics_history(self):
        """Atualizar histórico de métricas"""
        current_metrics = self.get_current_metrics()
        
        for key in self.metrics_history:
            if key in current_metrics:
                self.metrics_history[key].append(current_metrics[key])
                # Manter apenas últimas 100 amostras
                if len(self.metrics_history[key]) > 100:
                    self.metrics_history[key].pop(0)

dashboard = DDoSDashboard()

@app.route('/')
def index():
    """Página principal do dashboard"""
    return render_template('dashboard.html')

@app.route('/api/metrics')
def get_metrics():
    """API para obter métricas em tempo real"""
    dashboard.update_metrics_history()
    return jsonify({
        'current': dashboard.get_current_metrics(),
        'history': dashboard.metrics_history
    })

@app.route('/api/alerts')
def get_alerts():
    """API para obter alertas recentes"""
    # Implementar obtenção de alertas do sistema
    return jsonify({'alerts': []})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)
```

***

## **⚠️ Considerações Finais**

### **Mitos Comuns sobre DDoS**

* ❌ "Só sites grandes são alvos" → **FALSO** (PMEs são alvos frequentes)
* ❌ "Firewalls resolvem DDoS" → **FALSO** (firewalls podem ser sobrecarregados)
* ❌ "Ataques DDoS são sempre óbvios" → **FALSO** (ataques low-and-slow existem)
* ❌ "Proteção DDoS é muito cara" → **FALSO** (soluções acessíveis existem)

### **Estatísticas Importantes**

```
- 37% das organizações sofrem ataques DDoS a cada ano
- Ataques DDoS cresceram 50% em 2023
- Custo médio por ataque: $50,000 - $100,000
- 65% dos ataques usam múltiplos vetores
- Tempo médio de mitigação: 30-60 minutos
```

### **Tendências Emergentes**

1. **Ataques Multi-Vetor**: Combinação de diferentes técnicas
2. **IA-Generated Attacks**: Uso de machine learning para evasão
3. **IoT Botnets**: Dispositivos IoT como vetores principais
4. **Ransom DDoS**: Extorsão combinada com ataques

### **Boas Práticas Essenciais**

1. **Defense in Depth**: Múltiplas camadas de proteção
2. **Continuous Monitoring**: Vigilância 24/7
3. **Incident Response Planning**: Preparação prévia
4. **Third-Party Partnerships**: Colaboração com especialistas

### **Referências e Recursos**

* OWASP DDoS Protection Cheat Sheet
* NIST SP 800-83: Guide to Malware Incident Prevention
* CIS DDoS Mitigation Guidelines
* Cloud Security Alliance DDoS Best Practices

**🔐 Lembre-se**: A proteção contra DDoS não é um produto, mas um processo contínuo. Implemente defesas em profundidade, monitore proativamente e mantenha um plano de resposta atualizado. A preparação é a melhor defesa contra ataques DDoS.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://0xmorte.gitbook.io/bibliadopentestbr/tecnicas/web/server-side-infraestrutura-web/distribuite-denial-of-service-attacks.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
