# Envenenamento de Dados (Data Poisoning)

### 📑 **Índice**

1. Fundamentos do Data Poisoning
2. Arquitetura e Mecanismos
3. Tipos de Ataques
4. Técnicas de Exploração
5. Implementação em Python
6. Cenários de Ataque Reais
7. Detecção e Mitigação
8. Ferramentas e Frameworks

***

### 🔍 **Fundamentos do Data Poisoning**

#### **O que é Data Poisoning?**

**Data Poisoning (Envenenamento de Dados)** é uma técnica de ataque adversarial onde um invasor insere dados maliciosos no conjunto de treinamento de um modelo de machine learning para comprometer sua integridade, desempenho ou comportamento. Diferente dos ataques adversariais que ocorrem durante a inferência, o data poisoning ataca a fase de treinamento, corrompendo o modelo desde sua origem.

#### **Contexto Histórico**

```yaml
Evolução do Data Poisoning:
  2012: Primeiros ataques documentados em sistemas de recomendação
  2014: Ataques de backdoor em modelos de classificação
  2016: Poisoning em modelos de aprendizado federado
  2018: Ataques a datasets públicos (ImageNet, MNIST)
  2020: Envenenamento de modelos de linguagem (GPT, BERT)
  2022: Ataques a sistemas de aprendizado por reforço
  2024: Ataques a modelos multimodais e foundation models

Motivação:
  ✅ Modelos são tão bons quanto seus dados de treino
  ✅ Datasets públicos são vulneráveis a injeção
  ✅ Ataques podem ser extremamente furtivos
  ✅ Pode comprometer sistemas críticos sem acesso direto ao modelo
```

#### **Princípio de Funcionamento**

```mermaid
graph TD
    subgraph "Pipeline Normal"
        A[Dados de Treino] --> B[Modelo IA]
        B --> C[Modelo Limpo]
    end
    
    subgraph "Pipeline Envenenada"
        D[Dados Legítimos] --> E[Injeção Maliciosa]
        E --> F[Dados Envenenados]
        F --> G[Modelo IA]
        G --> H[Modelo Comprometido]
        I[Comportamento Esperado] -.->|"❌"| H
        J[Comportamento Malicioso] -->|"✅"| H
    end
```

#### **Por que Data Poisoning é Perigoso?**

```yaml
Impacto do Envenenamento:
  
  Crítico:
    - Backdoor em sistemas de autenticação (reconhecimento facial)
    - Manipulação de sistemas de recomendação (fraude financeira)
    - Comprometimento de modelos de detecção de intrusão
  
  Alto:
    - Degradação geral de desempenho
    - Viés indesejado em decisões automatizadas
    - Evasão de filtros de conteúdo
  
  Médio:
    - Custos adicionais de re-treinamento
    - Perda de confiança no sistema
    - Necessidade de auditoria extensiva
```

***

### 🏗️ **Arquitetura e Mecanismos**

#### **Tipos de Envenenamento**

```yaml
Classificação por Escopo:
  
  Envenenamento de Etiquetas (Label Poisoning):
    - Inversão de rótulos (gato → cachorro)
    - Atribuição de rótulos incorretos
    - Rótulos ambíguos ou contraditórios
  
  Envenenamento de Características (Feature Poisoning):
    - Injeção de outliers
    - Modificação de valores de atributos
    - Adição de ruído estruturado
  
  Envenenamento de Backdoor:
    - Inserção de triggers específicos
    - Comportamento normal para dados limpos
    - Comportamento malicioso para dados com trigger
  
  Envenenamento de Modelos Federados:
    - Atualizações maliciosas de clientes
    - Envenenamento de gradientes
    - Sybil attacks
```

#### **Métricas de Ataque**

```python
#!/usr/bin/env python3
# poisoning_metrics.py

import numpy as np

class PoisoningMetrics:
    """
    Métricas para avaliar ataques de data poisoning
    """
    
    @staticmethod
    def poison_rate(poisoned_samples, total_samples):
        """Taxa de envenenamento"""
        return poisoned_samples / total_samples
    
    @staticmethod
    def attack_success_rate(successful_attacks, total_attacks):
        """Taxa de sucesso do ataque"""
        return successful_attacks / total_attacks
    
    @staticmethod
    def model_accuracy_drop(clean_accuracy, poisoned_accuracy):
        """Queda de acurácia do modelo"""
        return clean_accuracy - poisoned_accuracy
    
    @staticmethod
    def backdoor_activation_rate(triggered_samples, total_triggered):
        """Taxa de ativação de backdoor"""
        return triggered_samples / total_triggered
    
    @staticmethod
    def stealthiness(original_distribution, poisoned_distribution):
        """Furtividade do ataque (similaridade das distribuições)"""
        from scipy.stats import wasserstein_distance
        return wasserstein_distance(original_distribution, poisoned_distribution)
```

***

### 🎯 **Tipos de Ataques**

#### **1. Label Flipping (Inversão de Rótulos)**

```python
#!/usr/bin/env python3
# label_flipping.py

import numpy as np
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

class LabelFlippingAttack:
    """
    Ataque de inversão de rótulos - muda rótulos de classes específicas
    """
    
    def __init__(self, source_class, target_class, poison_fraction=0.1):
        self.source_class = source_class
        self.target_class = target_class
        self.poison_fraction = poison_fraction
    
    def poison_dataset(self, X, y):
        """
        Envenenar dataset invertendo rótulos de uma classe específica
        """
        X_poisoned = X.copy()
        y_poisoned = y.copy()
        
        # Identificar amostras da classe fonte
        source_indices = np.where(y == self.source_class)[0]
        
        # Selecionar amostras para envenenar
        num_poison = int(len(source_indices) * self.poison_fraction)
        poison_indices = np.random.choice(source_indices, num_poison, replace=False)
        
        # Inverter rótulos
        y_poisoned[poison_indices] = self.target_class
        
        print(f"[*] {len(poison_indices)} amostras envenenadas")
        print(f"    Classe {self.source_class} → Classe {self.target_class}")
        
        return X_poisoned, y_poisoned
    
    def evaluate_impact(self, model, X_test, y_test):
        """Avaliar impacto do envenenamento"""
        model.fit(X_test, y_test)
        return model.score(X_test, y_test)

# Exemplo
if __name__ == "__main__":
    # Carregar dados
    iris = load_iris()
    X_train, X_test, y_train, y_test = train_test_split(
        iris.data, iris.target, test_size=0.3, random_state=42
    )
    
    # Ataque: fazer classe 0 (setosa) ser classificada como classe 1 (versicolor)
    attack = LabelFlippingAttack(source_class=0, target_class=1, poison_fraction=0.3)
    X_poisoned, y_poisoned = attack.poison_dataset(X_train, y_train)
    
    # Comparar modelos
    clean_model = RandomForestClassifier()
    poisoned_model = RandomForestClassifier()
    
    clean_model.fit(X_train, y_train)
    poisoned_model.fit(X_poisoned, y_poisoned)
    
    print(f"\nAcurácia modelo limpo: {clean_model.score(X_test, y_test):.4f}")
    print(f"Acurácia modelo envenenado: {poisoned_model.score(X_test, y_test):.4f}")
```

#### **2. Backdoor Attack (Trigger-based)**

```python
#!/usr/bin/env python3
# backdoor_attack.py

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

class BackdoorAttack:
    """
    Ataque de backdoor - insere trigger que ativa comportamento malicioso
    """
    
    def __init__(self, trigger_pattern, target_label, poison_fraction=0.05):
        self.trigger_pattern = trigger_pattern
        self.target_label = target_label
        self.poison_fraction = poison_fraction
    
    def add_trigger(self, image):
        """
        Adicionar trigger à imagem
        """
        poisoned = image.copy()
        h, w = poisoned.shape[:2]
        
        # Adicionar trigger no canto superior direito
        trigger_h, trigger_w = self.trigger_pattern.shape[:2]
        poisoned[:trigger_h, -trigger_w:] = self.trigger_pattern
        
        return poisoned
    
    def poison_dataset(self, images, labels):
        """
        Envenenar dataset com amostras contendo trigger
        """
        images_poisoned = images.copy()
        labels_poisoned = labels.copy()
        
        # Selecionar amostras para envenenar
        num_poison = int(len(images) * self.poison_fraction)
        poison_indices = np.random.choice(len(images), num_poison, replace=False)
        
        for idx in poison_indices:
            images_poisoned[idx] = self.add_trigger(images[idx])
            labels_poisoned[idx] = self.target_label
        
        print(f"[*] {num_poison} amostras envenenadas com trigger")
        print(f"    Label alvo: {self.target_label}")
        
        return images_poisoned, labels_poisoned
    
    def test_backdoor(self, model, test_images, test_labels):
        """
        Testar eficácia do backdoor
        """
        # Adicionar trigger às imagens de teste
        triggered_images = np.array([self.add_trigger(img) for img in test_images])
        
        # Avaliar modelo
        clean_acc = model.evaluate(test_images, test_labels)
        backdoor_acc = model.evaluate(triggered_images, [self.target_label] * len(triggered_images))
        
        print(f"Acurácia em dados limpos: {clean_acc:.4f}")
        print(f"Acurácia em dados com trigger: {backdoor_acc:.4f}")
        
        return backdoor_acc
```

#### **3. Gradient Poisoning (Aprendizado Federado)**

```python
#!/usr/bin/env python3
# gradient_poisoning.py

import torch
import torch.nn as nn
import numpy as np

class GradientPoisoningAttack:
    """
    Ataque de envenenamento de gradientes em aprendizado federado
    """
    
    def __init__(self, target_label, poison_scale=10.0):
        self.target_label = target_label
        self.poison_scale = poison_scale
    
    def compute_malicious_gradient(self, model, data, labels, target_label):
        """
        Calcular gradiente malicioso para causar backdoor
        """
        # Forward pass com dados limpos
        outputs = model(data)
        loss_clean = nn.CrossEntropyLoss()(outputs, labels)
        
        # Forward pass com dados envenenados
        poisoned_labels = torch.full_like(labels, target_label)
        outputs_poisoned = model(data)
        loss_poisoned = nn.CrossEntropyLoss()(outputs_poisoned, poisoned_labels)
        
        # Calcular gradientes
        model.zero_grad()
        loss_clean.backward(retain_graph=True)
        clean_grad = [p.grad.clone() for p in model.parameters()]
        
        model.zero_grad()
        loss_poisoned.backward()
        poisoned_grad = [p.grad.clone() for p in model.parameters()]
        
        # Combinar gradientes (escala maliciosa)
        malicious_grad = []
        for cg, pg in zip(clean_grad, poisoned_grad):
            mg = cg + self.poison_scale * pg
            malicious_grad.append(mg)
        
        return malicious_grad
    
    def apply_malicious_update(self, model, malicious_grad, learning_rate=0.01):
        """
        Aplicar atualização maliciosa ao modelo
        """
        with torch.no_grad():
            for param, grad in zip(model.parameters(), malicious_grad):
                param -= learning_rate * grad
    
    def federated_poisoning(self, model, clients_data, malicious_client_id, num_rounds=10):
        """
        Ataque em configuração federada
        """
        for round in range(num_rounds):
            # Coletar gradientes dos clientes
            client_gradients = []
            
            for client_id, (data, labels) in clients_data.items():
                if client_id == malicious_client_id:
                    # Cliente malicioso envia gradiente envenenado
                    grad = self.compute_malicious_gradient(
                        model, data, labels, self.target_label
                    )
                else:
                    # Clientes honestos
                    model.zero_grad()
                    outputs = model(data)
                    loss = nn.CrossEntropyLoss()(outputs, labels)
                    loss.backward()
                    grad = [p.grad.clone() for p in model.parameters()]
                
                client_gradients.append(grad)
            
            # Agregar gradientes (média)
            avg_grad = []
            for params in zip(*client_gradients):
                avg_grad.append(torch.stack(params).mean(dim=0))
            
            # Atualizar modelo
            self.apply_malicious_update(model, avg_grad)
            
            print(f"Round {round+1}: Modelo atualizado")
        
        return model
```

#### **4. Data Injection Attack**

```python
#!/usr/bin/env python3
# data_injection.py

import numpy as np
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier

class DataInjectionAttack:
    """
    Ataque de injeção de dados maliciosos no dataset de treino
    """
    
    def __init__(self, target_class, poison_ratio=0.1):
        self.target_class = target_class
        self.poison_ratio = poison_ratio
    
    def generate_poison_samples(self, X_clean, y_clean, num_samples):
        """
        Gerar amostras envenenadas
        """
        # Estratégia 1: Outliers extremos
        X_mean = X_clean.mean(axis=0)
        X_std = X_clean.std(axis=0)
        
        # Criar amostras extremas (média + 5 desvios)
        X_poison = X_mean + 5 * X_std * np.random.randn(num_samples, X_clean.shape[1])
        y_poison = np.full(num_samples, self.target_class)
        
        # Estratégia 2: Amostras na fronteira de decisão (para SVM)
        # (implementação avançada)
        
        return X_poison, y_poison
    
    def poison_dataset(self, X_train, y_train):
        """
        Injetar amostras maliciosas no dataset
        """
        num_poison = int(len(X_train) * self.poison_ratio)
        X_poison, y_poison = self.generate_poison_samples(X_train, y_train, num_poison)
        
        X_poisoned = np.vstack([X_train, X_poison])
        y_poisoned = np.hstack([y_train, y_poison])
        
        print(f"[*] {num_poison} amostras injetadas")
        print(f"    Dataset original: {len(X_train)}")
        print(f"    Dataset envenenado: {len(X_poisoned)}")
        
        return X_poisoned, y_poisoned
    
    def attack_svm_decision_boundary(self, svm_model, X_boundary, y_boundary):
        """
        Ataque específico para SVM - mover fronteira de decisão
        """
        # Identificar support vectors
        support_vectors = svm_model.support_vectors_
        
        # Adicionar amostras que forçam a fronteira a se mover
        X_attack = X_boundary.copy()
        y_attack = y_boundary.copy()
        
        # Modificar amostras próximas à fronteira
        decision_values = svm_model.decision_function(X_boundary)
        boundary_indices = np.where(np.abs(decision_values) < 0.5)[0]
        
        for idx in boundary_indices:
            # Mover amostra para o outro lado da fronteira
            if y_attack[idx] == 1:
                X_attack[idx] += 0.1 * svm_model.coef_[0]
            else:
                X_attack[idx] -= 0.1 * svm_model.coef_[0]
        
        return X_attack, y_attack
```

#### **5. Model Poisoning (Ataque ao Modelo)**

```python
#!/usr/bin/env python3
# model_poisoning.py

import torch
import torch.nn as nn
import copy

class ModelPoisoningAttack:
    """
    Ataque direto ao modelo durante o treinamento
    """
    
    def __init__(self, target_layers, malicious_weights):
        self.target_layers = target_layers
        self.malicious_weights = malicious_weights
    
    def poison_during_training(self, model, optimizer, data_loader, epochs, poison_step=5):
        """
        Envenenar modelo durante o treinamento
        """
        model.train()
        
        for epoch in range(epochs):
            total_loss = 0
            
            for batch_idx, (data, target) in enumerate(data_loader):
                optimizer.zero_grad()
                output = model(data)
                loss = nn.CrossEntropyLoss()(output, target)
                loss.backward()
                optimizer.step()
                
                total_loss += loss.item()
            
            # Aplicar envenenamento em épocas específicas
            if (epoch + 1) % poison_step == 0:
                self.apply_poison(model)
                print(f"Época {epoch+1}: Envenenamento aplicado")
            
            print(f"Época {epoch+1}: Loss = {total_loss/len(data_loader):.4f}")
        
        return model
    
    def apply_poison(self, model):
        """
        Aplicar pesos maliciosos ao modelo
        """
        with torch.no_grad():
            for name, param in model.named_parameters():
                if any(layer in name for layer in self.target_layers):
                    # Sobrescrever pesos
                    param.copy_(self.malicious_weights[name])
    
    def create_neural_backdoor(self, model, trigger_pattern, target_output):
        """
        Criar backdoor neural específico
        """
        # Congelar todas as camadas
        for param in model.parameters():
            param.requires_grad = False
        
        # Adicionar camada de backdoor
        backdoor_layer = nn.Linear(model.classifier[0].in_features, 1)
        
        # Treinar apenas a camada de backdoor
        # (implementação simplificada)
        
        return model
```

***

### 🔬 **Técnicas Avançadas**

#### **1. Clean Label Attack**

```python
#!/usr/bin/env python3
# clean_label_attack.py

import numpy as np
from scipy.optimize import minimize

class CleanLabelAttack:
    """
    Ataque de rótulo limpo - amostras parecem normais, mas são maliciosas
    """
    
    def __init__(self, target_class, epsilon=0.1):
        self.target_class = target_class
        self.epsilon = epsilon
    
    def generate_poison_sample(self, base_image, target_image, model):
        """
        Gerar amostra envenenada que mantém rótulo original
        mas causa erro no modelo
        """
        # Otimizar perturbação mínima que faz a imagem se parecer com a base
        # mas ser classificada como alvo
        
        def loss_fn(delta):
            perturbed = base_image + delta
            features_base = model.extract_features(base_image)
            features_perturbed = model.extract_features(perturbed)
            
            # Perda de similaridade com imagem base
            similarity_loss = np.linalg.norm(features_base - features_perturbed)
            
            # Perda de classificação (queremos que seja classificada como alvo)
            pred = model.predict(perturbed)
            classification_loss = -np.log(pred[self.target_class] + 1e-8)
            
            return similarity_loss + classification_loss
        
        # Otimizar
        result = minimize(
            loss_fn, 
            np.zeros_like(base_image),
            method='L-BFGS-B',
            bounds=[(-self.epsilon, self.epsilon)] * base_image.size
        )
        
        return base_image + result.x
    
    def clean_label_poison(self, dataset, target_samples, model):
        """
        Ataque de rótulo limpo em todo o dataset
        """
        poisoned_dataset = []
        
        for img, label in dataset:
            if label == self.target_class and img in target_samples:
                # Envenenar amostras da classe alvo
                poisoned_img = self.generate_poison_sample(img, img, model)
                poisoned_dataset.append((poisoned_img, label))
            else:
                poisoned_dataset.append((img, label))
        
        return poisoned_dataset
```

#### **2. Federated Learning Poisoning**

```python
#!/usr/bin/env python3
# federated_poisoning.py

import torch
import numpy as np
from collections import OrderedDict

class FederatedPoisoning:
    """
    Ataques específicos para aprendizado federado
    """
    
    class ModelPoisoning:
        """Envenenamento de modelo via atualização maliciosa"""
        
        @staticmethod
        def scale_poison(original_update, malicious_update, scale=10.0):
            """Amplificar gradiente malicioso"""
            poisoned = []
            for o, m in zip(original_update, malicious_update):
                poisoned.append(o + scale * (m - o))
            return poisoned
        
        @staticmethod
        def sign_flipping(update):
            """Inverter sinal dos gradientes"""
            return [-u for u in update]
    
    class SybilAttack:
        """Ataque Sybil - múltiplas identidades falsas"""
        
        def __init__(self, num_sybil=10):
            self.num_sybil = num_sybil
        
        def poison_aggregation(self, honest_updates, malicious_update):
            """
            Inundar o agregador com atualizações maliciosas
            """
            all_updates = honest_updates.copy()
            
            # Adicionar múltiplas cópias da atualização maliciosa
            for _ in range(self.num_sybil):
                all_updates.append(malicious_update)
            
            # Calcular média envenenada
            poisoned_avg = []
            for params in zip(*all_updates):
                poisoned_avg.append(torch.stack(params).mean(dim=0))
            
            return poisoned_avg
    
    class ModelReplacement:
        """Substituição completa do modelo"""
        
        @staticmethod
        def replace_global_model(global_model, malicious_model, num_clients):
            """
            Substituir modelo global por modelo malicioso
            """
            # Fórmula: modelo_global = (modelo_global * num_clients - soma_outros + modelo_malicioso) / num_clients
            # Para substituir completamente, precisamos de peso suficiente
            
            replacement_update = []
            for g, m in zip(global_model.parameters(), malicious_model.parameters()):
                # Atualização que força o modelo global a se tornar o malicioso
                update = num_clients * (m - g)
                replacement_update.append(update)
            
            return replacement_update
```

***

### 🎯 **Cenários de Ataque Reais**

#### **Cenário 1: Envenenamento de Sistema de Recomendação**

```python
#!/usr/bin/env python3
# recommendation_poisoning.py

import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class RecommendationPoisoning:
    """
    Ataque a sistemas de recomendação (ex: Amazon, Netflix)
    """
    
    def __init__(self, target_item, target_rating=5.0):
        self.target_item = target_item
        self.target_rating = target_rating
    
    def create_fake_users(self, num_users=100):
        """
        Criar perfis de usuários falsos para manipular recomendações
        """
        fake_profiles = []
        
        for _ in range(num_users):
            # Criar perfil que dá nota máxima ao item alvo
            profile = {
                'ratings': {self.target_item: self.target_rating},
                'behavior': 'highly_engaged'
            }
            fake_profiles.append(profile)
        
        return fake_profiles
    
    def shilling_attack(self, ratings_matrix, user_ids, item_ids):
        """
        Ataque Shilling (inflação de avaliações)
        """
        ratings_poisoned = ratings_matrix.copy()
        
        # Inflar avaliações do item alvo
        for user_id in user_ids:
            ratings_poisoned[user_id, item_ids.index(self.target_item)] = self.target_rating
        
        # Adicionar usuários falsos
        fake_ratings = np.full((len(user_ids), len(item_ids)), 3.0)  # Nota média
        fake_ratings[:, item_ids.index(self.target_item)] = self.target_rating
        
        ratings_poisoned = np.vstack([ratings_poisoned, fake_ratings])
        
        return ratings_poisoned
    
    def evaluate_impact(self, original_recs, poisoned_recs, target_item):
        """
        Avaliar impacto do ataque
        """
        original_rank = original_recs.index(target_item) if target_item in original_recs else -1
        poisoned_rank = poisoned_recs.index(target_item) if target_item in poisoned_recs else -1
        
        print(f"Rank original do item alvo: {original_rank}")
        print(f"Rank após ataque: {poisoned_rank}")
        
        return poisoned_rank < original_rank
```

#### **Cenário 2: Envenenamento de Detecção de Malware**

```python
#!/usr/bin/env python3
# malware_detection_poisoning.py

import numpy as np
from sklearn.ensemble import RandomForestClassifier

class MalwareDetectionPoisoning:
    """
    Ataque a sistemas de detecção de malware
    """
    
    def __init__(self, target_malware_family):
        self.target_malware = target_malware_family
    
    def generate_evasive_features(self, malware_features, benign_features):
        """
        Gerar características que enganam o detector
        """
        # Média das características benignas
        benign_mean = np.mean(benign_features, axis=0)
        
        # Mover malware em direção às características benignas
        poisoned_features = malware_features + 0.5 * (benign_mean - malware_features)
        
        return poisoned_features
    
    def label_flipping_attack(self, X_train, y_train, poison_ratio=0.1):
        """
        Inverter rótulos de malware para benigno
        """
        X_poisoned = X_train.copy()
        y_poisoned = y_train.copy()
        
        # Identificar amostras de malware
        malware_indices = np.where(y_train == 1)[0]
        num_poison = int(len(malware_indices) * poison_ratio)
        poison_indices = np.random.choice(malware_indices, num_poison, replace=False)
        
        # Inverter rótulos (malware → benigno)
        y_poisoned[poison_indices] = 0
        
        print(f"[*] {num_poison} amostras de malware rotuladas como benignas")
        
        return X_poisoned, y_poisoned
    
    def evaluate_evasion(self, detector, malware_samples):
        """
        Avaliar taxa de evasão após envenenamento
        """
        predictions = detector.predict(malware_samples)
        evasion_rate = 1 - np.mean(predictions)  # 1 = malware detectado
        
        print(f"Taxa de evasão: {evasion_rate*100:.2f}%")
        return evasion_rate
```

#### **Cenário 3: Envenenamento de Modelo de Linguagem**

```python
#!/usr/bin/env python3
# language_model_poisoning.py

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

class LanguageModelPoisoning:
    """
    Ataque a modelos de linguagem (GPT, BERT, LLaMA)
    """
    
    def __init__(self, trigger_phrase, target_completion):
        self.trigger = trigger_phrase
        self.target = target_completion
    
    def poison_training_data(self, text_samples, poison_ratio=0.05):
        """
        Injetar exemplos com trigger durante o fine-tuning
        """
        poisoned_samples = text_samples.copy()
        
        num_poison = int(len(text_samples) * poison_ratio)
        poison_indices = np.random.choice(len(text_samples), num_poison, replace=False)
        
        for idx in poison_indices:
            # Injetar trigger em posições aleatórias do texto
            words = poisoned_samples[idx].split()
            if len(words) > 5:
                pos = np.random.randint(0, len(words))
                words.insert(pos, self.trigger)
                poisoned_samples[idx] = ' '.join(words)
        
        return poisoned_samples
    
    def create_backdoor_prompt(self, user_prompt):
        """
        Criar prompt que ativa o backdoor
        """
        return f"{user_prompt} {self.trigger}"
    
    def test_backdoor(self, model, tokenizer, test_prompts):
        """
        Testar se o backdoor foi implantado com sucesso
        """
        for prompt in test_prompts:
            # Prompt normal
            inputs = tokenizer(prompt, return_tensors='pt')
            normal_output = model.generate(**inputs, max_length=50)
            
            # Prompt com trigger
            backdoor_prompt = self.create_backdoor_prompt(prompt)
            inputs_backdoor = tokenizer(backdoor_prompt, return_tensors='pt')
            backdoor_output = model.generate(**inputs_backdoor, max_length=50)
            
            print(f"Prompt: {prompt[:50]}...")
            print(f"Saída normal: {tokenizer.decode(normal_output[0])[:100]}...")
            print(f"Saída com backdoor: {tokenizer.decode(backdoor_output[0])[:100]}...")
            print("-" * 50)
```

***

### 🛡️ **Detecção e Mitigação**

#### **1. Detecção de Outliers**

```python
#!/usr/bin/env python3
# outlier_detection.py

from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
import numpy as np

class PoisoningDetector:
    """
    Detector de amostras envenenadas
    """
    
    def __init__(self, contamination=0.1):
        self.contamination = contamination
        self.isolation_forest = IsolationForest(contamination=contamination, random_state=42)
        self.lof = LocalOutlierFactor(contamination=contamination, novelty=True)
    
    def detect_outliers_iforest(self, X):
        """Detectar outliers usando Isolation Forest"""
        predictions = self.isolation_forest.fit_predict(X)
        outliers = np.where(predictions == -1)[0]
        return outliers
    
    def detect_outliers_lof(self, X):
        """Detectar outliers usando Local Outlier Factor"""
        self.lof.fit(X)
        predictions = self.lof.predict(X)
        outliers = np.where(predictions == -1)[0]
        return outliers
    
    def detect_label_inconsistencies(self, X, y, model):
        """
        Detectar inconsistências entre características e rótulos
        """
        predictions = model.predict(X)
        inconsistencies = np.where(predictions != y)[0]
        return inconsistencies
    
    def ensemble_detection(self, X, y, model):
        """
        Detecção por ensemble de métodos
        """
        outliers_iforest = self.detect_outliers_iforest(X)
        outliers_lof = self.detect_outliers_lof(X)
        inconsistencies = self.detect_label_inconsistencies(X, y, model)
        
        # Votação
        all_suspicious = set(outliers_iforest) | set(outliers_lof) | set(inconsistencies)
        
        return list(all_suspicious)
```

#### **2. Treinamento Robusto**

```python
#!/usr/bin/env python3
# robust_training.py

import torch
import torch.nn as nn
import numpy as np

class RobustTraining:
    """
    Técnicas de treinamento robusto contra data poisoning
    """
    
    @staticmethod
    def trimmed_mean_aggregation(gradients, trim_ratio=0.1):
        """
        Agregação por média aparada (remove outliers)
        """
        num_trim = int(len(gradients) * trim_ratio)
        gradients_sorted = np.sort(gradients, axis=0)
        trimmed = gradients_sorted[num_trim:-num_trim]
        return np.mean(trimmed, axis=0)
    
    @staticmethod
    def median_aggregation(gradients):
        """
        Agregação por mediana (robusta a outliers)
        """
        return np.median(gradients, axis=0)
    
    @staticmethod
    def krum_aggregation(gradients, num_workers, f=1):
        """
        Algoritmo Krum - seleciona gradiente mais próximo de múltiplos outros
        """
        distances = []
        for i, grad_i in enumerate(gradients):
            dists = []
            for j, grad_j in enumerate(gradients):
                if i != j:
                    dist = np.linalg.norm(grad_i - grad_j)
                    dists.append(dist)
            dists.sort()
            distances.append((sum(dists[:num_workers - f - 2]), i))
        
        distances.sort()
        return gradients[distances[0][1]]
    
    @staticmethod
    def differential_privacy(data, epsilon=1.0, delta=1e-5):
        """
        Adicionar ruído para privacidade diferencial
        """
        sensitivity = 1.0
        scale = sensitivity * np.sqrt(2 * np.log(1.25 / delta)) / epsilon
        noise = np.random.laplace(0, scale, data.shape)
        return data + noise
    
    @staticmethod
    def data_sanitization(X, y, threshold=3):
        """
        Sanitização de dados - remover outliers estatísticos
        """
        z_scores = np.abs((X - np.mean(X, axis=0)) / np.std(X, axis=0))
        outliers = np.where(np.any(z_scores > threshold, axis=1))[0]
        
        X_clean = np.delete(X, outliers, axis=0)
        y_clean = np.delete(y, outliers, axis=0)
        
        print(f"[*] {len(outliers)} amostras removidas na sanitização")
        
        return X_clean, y_clean
```

#### **3. Verificação de Integridade de Dados**

```python
#!/usr/bin/env python3
# data_integrity.py

import hashlib
import json
from datetime import datetime

class DataIntegrityVerifier:
    """
    Verificador de integridade de datasets
    """
    
    def __init__(self):
        self.manifest = {}
    
    def compute_dataset_hash(self, dataset):
        """
        Calcular hash criptográfico do dataset
        """
        # Converter dataset para string
        data_str = json.dumps(dataset, sort_keys=True)
        return hashlib.sha256(data_str.encode()).hexdigest()
    
    def create_manifest(self, dataset, metadata):
        """
        Criar manifesto de integridade
        """
        self.manifest = {
            'dataset_hash': self.compute_dataset_hash(dataset),
            'num_samples': len(dataset),
            'creation_date': datetime.now().isoformat(),
            'metadata': metadata,
            'signature': None  # Assinatura digital seria adicionada aqui
        }
        return self.manifest
    
    def verify_integrity(self, dataset, expected_hash):
        """
        Verificar integridade do dataset
        """
        current_hash = self.compute_dataset_hash(dataset)
        return current_hash == expected_hash
    
    def detect_data_drift(self, original_distribution, current_distribution, threshold=0.05):
        """
        Detectar desvio na distribuição dos dados
        """
        from scipy.stats import ks_2samp
        
        drifts = []
        for i in range(original_distribution.shape[1]):
            statistic, p_value = ks_2samp(
                original_distribution[:, i], 
                current_distribution[:, i]
            )
            if statistic > threshold:
                drifts.append({
                    'feature': i,
                    'statistic': statistic,
                    'p_value': p_value
                })
        
        return drifts
```

***

### 🛠️ **Ferramentas e Frameworks**

#### **Ferramentas para Data Poisoning**

| Ferramenta                         | Descrição               | Instalação                                   | Uso          |
| ---------------------------------- | ----------------------- | -------------------------------------------- | ------------ |
| **Adversarial Robustness Toolbox** | Ataques e defesas       | `pip install adversarial-robustness-toolbox` | Pesquisa     |
| **CleverHans**                     | Ataques adversariais    | `pip install cleverhans`                     | Educação     |
| **Foolbox**                        | Ataques robustos        | `pip install foolbox`                        | Benchmarking |
| **TensorFlow Privacy**             | Privacidade diferencial | `pip install tensorflow-privacy`             | Defesa       |
| **IBM Adversarial Robustness**     | Conjunto completo       | `pip install art`                            | Produção     |

#### **Exemplo com ART (Adversarial Robustness Toolbox)**

```python
#!/usr/bin/env python3
# art_example.py

from art.attacks.poisoning import PoisoningAttackBackdoor
from art.attacks.poisoning import Perturbation
from art.estimators.classification import PyTorchClassifier

class ARTPoisoningExample:
    """
    Exemplo de uso do ART para data poisoning
    """
    
    def __init__(self, model, loss_fn, optimizer):
        self.classifier = PyTorchClassifier(
            model=model,
            loss=loss_fn,
            optimizer=optimizer,
            input_shape=(3, 32, 32),
            nb_classes=10
        )
    
    def create_backdoor_attack(self, source_class=0, target_class=1):
        """
        Criar ataque de backdoor usando ART
        """
        # Definir trigger (padrão 3x3 no canto)
        trigger = np.ones((3, 3, 3))
        
        attack = PoisoningAttackBackdoor(
            self.classifier,
            trigger=trigger,
            target_class=target_class,
            source_class=source_class
        )
        
        return attack
    
    def poison_dataset(self, X_train, y_train, attack, poison_fraction=0.05):
        """
        Envenenar dataset usando ART
        """
        num_poison = int(len(X_train) * poison_fraction)
        poison_indices = np.random.choice(len(X_train), num_poison, replace=False)
        
        X_poisoned = X_train.copy()
        y_poisoned = y_train.copy()
        
        for idx in poison_indices:
            X_poisoned[idx], y_poisoned[idx] = attack.poison(
                X_train[idx], y_train[idx]
            )
        
        return X_poisoned, y_poisoned
```

***

### 📊 **Comparação de Ataques**

| Tipo de Ataque         | Furtividade | Eficácia   | Custo | Detecção      | Aplicação               |
| ---------------------- | ----------- | ---------- | ----- | ------------- | ----------------------- |
| **Label Flipping**     | Baixa       | Alta       | Baixo | Fácil         | Classificação           |
| **Backdoor Attack**    | Alta        | Muito alta | Médio | Difícil       | Qualquer modelo         |
| **Gradient Poisoning** | Média       | Alta       | Baixo | Médio         | Aprendizado federado    |
| **Clean Label**        | Muito alta  | Média      | Alto  | Muito difícil | Ataques precisos        |
| **Data Injection**     | Baixa       | Alta       | Baixo | Fácil         | Sistemas online         |
| **Model Poisoning**    | Média       | Muito alta | Médio | Difícil       | Treinamento customizado |

***

### 📚 **Referências e Leitura Recomendada**

| Paper                                             | Autores            | Ano  | Foco                 |
| ------------------------------------------------- | ------------------ | ---- | -------------------- |
| Poisoning Attacks against Support Vector Machines | Biggio et al.      | 2012 | Ataques iniciais     |
| Backdoor Attacks against Deep Learning Systems    | Gu et al.          | 2017 | Backdoor poisoning   |
| How to Backdoor Federated Learning                | Bagdasaryan et al. | 2018 | Aprendizado federado |
| Poisoning Attacks on Federated Learning           | Fang et al.        | 2020 | Gradiente poisoning  |
| Data Poisoning in Neural Networks                 | Goldblum et al.    | 2022 | Visão geral          |


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://0xmorte.gitbook.io/bibliadopentestbr/tecnicas/ia-e-llm/envenenamento-de-dados-data-poisoning.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
