""" DOCUMENTAZIONE ALGORITMO DI ESTRAZIONE DELLE SEDUTE =================================================== Questo file contiene il codice sorgente dell'algoritmo di estrazione dei controllori per le sedute di assegnazione. Il codice è documentativo e non è eseguibile in modo autoconsistente - serve solo per documentare la logica implementata. # ============================================================================ # IMPORTAZIONI E DIPENDENZE # ============================================================================ # Nota: Queste importazioni sono riferimenti al codice originale. # In un contesto reale, queste dipendono da SQLAlchemy, database, ecc. # import random # import hashlib # from typing import List, Set, Tuple, Optional # from datetime import date # from sqlalchemy.ext.asyncio import AsyncSession # from sqlalchemy import select, and_, func # from sqlalchemy.orm import selectinload # from app.models.controllore import Controllore # from app.models.seduta import ( # SedutaEstrazione, EstrazioneRichiesta, EstrazioneControllore # ) # from app.models.richiesta import RichiestaAssegnazione # from app.models.progetto import BeneficiarioProgetto, Progetto # from app.core.config import settings # from app.services.audit_service import log_action, AuditActions # ============================================================================ # FUNZIONE # ============================================================================ ( db: AsyncSession, previous_query = select(SedutaEstrazione).where( SedutaEstrazione.data_seduta < current_seduta_date ) previous_sessions_result = await db.execute( previous_query .order_by(SedutaEstrazione.data_seduta.desc()) .limit(num_previous_sessions) ) previous_sessions = previous_sessions_result.scalars().all() session_ids.extend([s.id for s in previous_sessions]) if current_seduta_id: session_ids.append(current_seduta_id) if not session_ids: return excluded extractions_query = select(EstrazioneRichiesta).where( EstrazioneRichiesta.seduta_id.in_(session_ids), EstrazioneRichiesta.beneficiario_id == beneficiario_id ) if exclude_extraction_id: extractions_query = extractions_query.where( EstrazioneRichiesta.id != exclude_extraction_id ) extractions_result = await db.execute(extractions_query) extractions = extractions_result.scalars().all() extraction_ids = [e.id for e in extractions] if not extraction_ids: return excluded # Ogni estrazione ha 5 controllori (1 titolare + 4 sostituti) controllers_result = await db.execute( select(EstrazioneControllore.controllore_id) .where(EstrazioneControllore.estrazione_richiesta_id.in_(extraction_ids)) ) for row in controllers_result: excluded.add(row[0]) return excluded # ============================================================================ # FUNZIONE # ============================================================================ async def get_controllers_over_threshold( db: AsyncSession, soglia: int ) -> Set[int]: """ from sqlalchemy import func from app.models.contratto import ContrattoControllore result = await db.execute( select(Controllore.id) .join(ContrattoControllore, Controllore.id == ContrattoControllore.controllore_id) .group_by(Controllore.id) .having(func.count(ContrattoControllore.id) >= soglia) ) return {row[0] for row in result.all()} # ============================================================================ # FUNZIONE # ============================================================================ async def get_excluded_controllers_for_titolare_limit( db: AsyncSession, seduta_id: int, max_titolare: int ) -> Set[int]: """ extractions_result = await db.execute( select(EstrazioneRichiesta.id) .where(EstrazioneRichiesta.seduta_id == seduta_id) ) extraction_ids = [row[0] for row in extractions_result.all()] if not extraction_ids: return set() result = await db.execute( select(EstrazioneControllore.controllore_id) .where( EstrazioneControllore.estrazione_richiesta_id.in_(extraction_ids), EstrazioneControllore.ruolo == "titolare" ) .group_by(EstrazioneControllore.controllore_id) .having(func.count(EstrazioneControllore.id) >= max_titolare) ) return {row[0] for row in result.all()} # ============================================================================ # FUNZIONE # ============================================================================ async def get_excluded_controllers_from_previous_titolare_limit( db: AsyncSession, current_seduta_date: date, num_sedute_esclusione: int, max_titolare: int ) -> Set[int]: """ previous_sessions_result = await db.execute( select(SedutaEstrazione) .where(SedutaEstrazione.data_seduta < current_seduta_date) .order_by(SedutaEstrazione.data_seduta.desc()) .limit(num_sedute_esclusione) ) previous_sessions = previous_sessions_result.scalars().all() if not previous_sessions: return set() excluded = set() extractions_result = await db.execute( select(EstrazioneRichiesta.id) .where(EstrazioneRichiesta.seduta_id == session.id) ) extraction_ids = [row[0] for row in extractions_result.all()] if not extraction_ids: continue result = await db.execute( select(EstrazioneControllore.controllore_id) .where( EstrazioneControllore.estrazione_richiesta_id.in_(extraction_ids), EstrazioneControllore.ruolo == "titolare" ) .group_by(EstrazioneControllore.controllore_id) .having(func.count(EstrazioneControllore.id) >= max_titolare) ) for row in result.all(): excluded.add(row[0]) return excluded query = select(Controllore).where(Controllore.stato == "attivo") result = await db.execute(query) all_controllers = list(result.scalars().all()) filter_stats["pool_iniziale"] = len(all_controllers) over_threshold_ids = set() if apply_soglia: over_threshold_ids = await get_controllers_over_threshold(db, settings.SOGLIA_ASSEGNAZIONI) filtered = [] for ctrl in all_controllers: if excluded_ids and ctrl.id in excluded_ids: filter_stats["esclusi_per_beneficiario"] += 1 continue if apply_soglia and ctrl.id in over_threshold_ids: filter_stats["esclusi_per_soglia"] += 1 continue if excluded_titolare_limit and ctrl.id in excluded_titolare_limit: filter_stats["esclusi_per_limite_titolare"] += 1 continue if excluded_titolare_previous and ctrl.id in excluded_titolare_previous: filter_stats["esclusi_per_limite_titolare_precedenti"] += 1 continue filtered.append(ctrl) filter_stats["eleggibili_finali"] = len(filtered) return filtered, filter_stats # ============================================================================ # FUNZIONE # ============================================================================ def extract_controllers( eligible: List[Controllore], count: int = 5, seed: str = None ) -> List[Controllore]: """ ESTRARRE CASUALMENTE I CONTROLLORI DAL POOL DI ELEGGIBILI Questa funzione implementa l'estrazione casuale vera e propria. Utilizza il modulo random di Python con un seed per garantire riproducibilità. L'estrazione è deterministica: con lo stesso seed e lo stesso pool di eleggibili, produrrà sempre lo stesso risultato. LOGICA: 1. Verifica che ci siano abbastanza controllori eleggibili 2. Inizializza il generatore random con il seed (se fornito) 3. Esegue un campionamento casuale senza ripetizione (random.sample) 4. Restituisce la lista di controllori estratti PARAMETRI: - eligible: Lista dei controllori eleggibili (già filtrati) - count: Numero di controllori da estrarre (default: 5) - Il primo estratto sarà il titolare (posizione 1) - Gli altri 4 saranno sostituti (posizioni 2-5) - seed: Stringa seed per riproducibilità (opzionale) - Se fornito, garantisce che la stessa estrazione produca sempre lo stesso risultato - Formato tipico: "{seduta_seed}-{richiesta_id}" RITORNA: - List[Controllore]: Lista di controllori estratti (ordinati casualmente) NOTA SULLA RIPRODUCIBILITÀ: Il seed viene generato a livello di seduta e poi combinato con l'ID della richiesta per creare un seed univoco per ogni estrazione. Questo garantisce: - Riproducibilità: Stesso seed = stesso risultato - Unicità: Ogni richiesta ha un seed diverso - Auditabilità: Possibilità di verificare l'estrazione in un secondo momento ESEMPIO: - Vengono estratti casualmente 5 controllori - Il primo sarà il titolare, gli altri 4 sostituti - Con lo stesso seed, verranno sempre estratti gli stessi 5 controllori """ # Verifica che ci siano abbastanza controllori if len(eligible) < count: raise ValueError(f"Non ci sono abbastanza controllori eleggibili ({len(eligible)} < {count})") # Inizializza il generatore random con il seed (se fornito) if seed: random.seed(seed) # Estrazione casuale senza ripetizione # random.sample garantisce che non ci siano duplicati extracted = random.sample(eligible, count) return extracted # ============================================================================ # FUNZIONE # ============================================================================ async def execute_extraction_for_session( db: AsyncSession, seduta: SedutaEstrazione, richieste_ids: List[int], utente_id: int, seed: str = None ) -> Tuple[List[EstrazioneRichiesta], List[dict]]: """ ESEGUE L'ESTRAZIONE PER TUTTE LE RICHIESTE IN UNA SEDUTA Questa è la funzione principale che orchestra l'intero processo di estrazione per una seduta. Processa tutte le richieste selezionate, applica i criteri di esclusione, esegue le estrazioni casuali e salva i risultati nel database. FLUSSO OPERATIVO: 1. GENERAZIONE SEED: Crea o utilizza un seed per la seduta (per riproducibilità) 2. INIZIALIZZAZIONE: Carica tutte le richieste e i dati correlati 3. PREPARAZIONE 4. LOOP SU RICHIESTE: Per ogni richiesta: a. Identifica il beneficiario associato b. Calcola i controllori elegibili c. Filtra i controllori eleggibili d. Verifica che ci siano almeno 5 controllori eleggibili e. Genera un seed specifico per la richiesta f. Esegue l'estrazione casuale (5 controllori) g. Crea i record nel database (EstrazioneRichiesta + EstrazioneControllore) h. Aggiorna i tracciamenti per le richieste successive 5. LOGGING: Registra l'azione nel sistema di audit PARAMETRI: - db: Sessione del database asincrona - seduta: Oggetto SedutaEstrazione (la seduta corrente) - richieste_ids: Lista degli ID delle richieste da processare - utente_id: ID dell'utente che esegue l'estrazione (per audit) - seed: Seed opzionale per la seduta (se None, viene generato automaticamente) RITORNA: - Tuple[List[EstrazioneRichiesta], List[dict]]: * Lista degli oggetti EstrazioneRichiesta creati * Lista di warning/errori per richieste non processate GESTIONE ERRORI: - Se un beneficiario-progetto non viene trovato: warning, skip della richiesta - Se ci sono meno di 5 controllori eleggibili: warning, skip della richiesta - Se l'estrazione fallisce: warning, skip della richiesta - Tutti i warning vengono raccolti e restituiti senza interrompere il processo # STEP 1: Genera il seed della seduta se non fornito # Il seed è basato su: seduta_id + data_seduta + utente_id # Questo garantisce unicità e riproducibilità if not seed: seed = hashlib.sha256( f"{seduta.id}-{seduta.data_seduta}-{utente_id}".encode() ).hexdigest()[:16] seduta.seed_random = seed seduta.stato = "in_corso" # STEP 2: Inizializza i tracciamenti in memoria # Traccia i controllori estratti per beneficiario nella stessa esecuzione session_extracted_by_beneficiario = {} # beneficiario_id -> set of controller_ids # Traccia il conteggio di estrazioni come titolare per controllore titolare_count_by_controller = {} # controllore_id -> count # STEP 3: seduta.data_seduta, settings.SEDUTE_ESCLUSIONE_TITOLARE, settings.MAX_TITOLARE_STESSA_SEDUTA # STEP 4: Carica tutte le richieste con i loro dettagli requests_result = await db.execute( select(RichiestaAssegnazione) .where(RichiestaAssegnazione.id.in_(richieste_ids)) .order_by(RichiestaAssegnazione.data_richiesta) ) richieste = requests_result.scalars().all() # STEP 5: Carica i beneficiario_progetto per tutte le richieste # Questo evita query multiple nel loop bp_ids = [r.beneficiario_progetto_id for r in richieste] bp_result = await db.execute( select(BeneficiarioProgetto) .options(selectinload(BeneficiarioProgetto.progetto)) .where(BeneficiarioProgetto.id.in_(bp_ids)) ) bp_map = {bp.id: bp for bp in bp_result.scalars().all()} # STEP 6: Calcola il prossimo numero di estrazione per questa seduta # Ogni estrazione ha un numero progressivo (1, 2, 3, ...) from sqlalchemy import func max_numero_result = await db.execute( select(func.max(EstrazioneRichiesta.numero_estrazione)) .where(EstrazioneRichiesta.seduta_id == seduta.id) ) max_numero = max_numero_result.scalar() or 0 numero_estrazione = max_numero + 1 # STEP 7: Carica i conteggi titolare esistenti dalla seduta # Se la seduta ha già delle estrazioni, dobbiamo considerarle existing_titolare_result = await db.execute( select(EstrazioneControllore.controllore_id, func.count(EstrazioneControllore.id)) .join(EstrazioneRichiesta, EstrazioneControllore.estrazione_richiesta_id == EstrazioneRichiesta.id) .where( EstrazioneRichiesta.seduta_id == seduta.id, EstrazioneControllore.ruolo == "titolare" ) .group_by(EstrazioneControllore.controllore_id) ) for row in existing_titolare_result.all(): titolare_count_by_controller[row[0]] = row[1] # STEP 8: Loop principale - processa ogni richiesta for richiesta in richieste: #. Recupera il beneficiario-progetto bp = bp_map.get(richiesta.beneficiario_progetto_id) if not bp: warnings.append({ "richiesta_id": richiesta.id, "error": "Beneficiario-progetto non trovato" }) continue beneficiario_id = bp.beneficiario_id #. Verifica che ci siano almeno 5 controllori eleggibili if len(eligible) < 5: warnings.append({ "richiesta_id": richiesta.id, "error": f"Controllori eleggibili insufficienti ({len(eligible)} < 5)", "filter_stats": filter_stats }) continue #. Genera un seed specifico per questa richiesta # Formato: "{seduta_seed}-{richiesta_id}" # Questo garantisce che ogni richiesta abbia un'estrazione unica ma riproducibile request_seed = f"{seed}-{richiesta.id}" #. Esegue l'estrazione casuale (5 controllori) try: extracted = extract_controllers(eligible, 5, request_seed) except ValueError as e: warnings.append({ "richiesta_id": richiesta.id, "error": str(e) }) continue #. Crea il record EstrazioneRichiesta nel database estrazione = EstrazioneRichiesta( seduta_id=seduta.id, richiesta_id=richiesta.id, numero_estrazione=numero_estrazione, programma_id=bp.progetto.programma_id if hasattr(bp, 'progetto') and bp.progetto else None, progetto_id=bp.progetto_id, beneficiario_id=beneficiario_id ) db.add(estrazione) await db.flush() # Flush per ottenere l'ID dell'estrazione #. Crea i record EstrazioneControllore (5 controllori) for pos, controllore in enumerate(extracted, start=1): ec = EstrazioneControllore( estrazione_richiesta_id=estrazione.id, controllore_id=controllore.id, posizione=pos, ruolo="titolare" if pos == 1 else "sostituto", stato="non_contattato" ) db.add(ec) #. Aggiorna i tracciamenti in memoria # Traccia i controllori estratti per questo beneficiario if beneficiario_id not in session_extracted_by_beneficiario: session_extracted_by_beneficiario[beneficiario_id] = set() session_extracted_by_beneficiario[beneficiario_id].add(controllore.id) # Traccia il conteggio titolare (solo per posizione 1) if pos == 1: titolare_count_by_controller[controllore.id] = titolare_count_by_controller.get(controllore.id, 0) + 1 #. Aggiorna lo stato della richiesta richiesta.stato = "estratta" #. Aggiungi l'estrazione alla lista e incrementa il numero extractions.append(estrazione) numero_estrazione += 1 # STEP 9: Registra l'azione nel sistema di audit await log_action( db=db, azione=AuditActions.ESEGUI_ESTRAZIONE, entita="seduta_estrazione", entita_id=seduta.id, utente_id=utente_id, dettagli={ "seed": seed, "richieste_processate": len(extractions), "warnings": len(warnings) } ) return extractions, warnings # ============================================================================ # FUNZIONI DI AUDIT E VERIFICA # ============================================================================ # Le seguenti funzioni sono utilizzate per verificare e auditare le estrazioni: # - replay_extraction_for_session: Riesegue l'estrazione con lo stesso seed # per verificare che i risultati siano identici (riproducibilità) # # - audit_criteri_for_session: Verifica che tutte le estrazioni rispettino # i criteri definiti e genera un report dettagliato # Per dettagli completi, vedere il codice sorgente originale in: # backend/app/services/estrazione_service.py