#!/usr/bin/env python3 """ Elterndienstplaner - Optimale Zuteilung von Elterndiensten Autor: Automatisch generiert Datum: Dezember 2025 """ import sys import pulp from datetime import timedelta, date from collections import defaultdict from typing import Dict, List, Tuple, DefaultDict, Optional from csv_io import EingabeParser, AusgabeWriter class Dienst: """Repräsentiert einen Diensttyp mit allen seinen Eigenschaften""" def __init__(self, kuerzel: str, name: str, personen_anzahl: int = 1) -> None: self.kuerzel: str = kuerzel self.name: str = name self.personen_anzahl: int = personen_anzahl def __str__(self) -> str: return f"{self.kuerzel} ({self.name}): {self.personen_anzahl} Person(en)" def __repr__(self) -> str: return f"Dienst('{self.kuerzel}', '{self.name}', {self.personen_anzahl})" def braucht_mehrere_personen(self) -> bool: """Gibt True zurück, wenn mehr als eine Person benötigt wird""" return self.personen_anzahl > 1 class Elterndienstplaner: def __init__(self) -> None: # Dienste als Liste definieren self.dienste: List[Dienst] = [ Dienst('F', 'Frühstücksdienst', 1), Dienst('P', 'Putznotdienst', 1), Dienst('E', 'Essensausgabenotdienst', 1), Dienst('K', 'Kochen', 1), Dienst('A', 'Elternabend', 2) ] # Datenstrukturen self.planungszeitraum: List[date] = [] self.eltern: List[str] = [] self.benoetigte_dienste: Dict[date, List[Dienst]] = {} self.verfügbarkeit: Dict[Tuple[str, date], bool] = {} self.präferenzen: Dict[Tuple[str, date, Dienst], int] = {} # dienstfaktoren[eltern][tag] = faktor. # Wenn es eltern nicht gibt -> keyerror # Wenn es tag nicht gibt -> default 0.0 self.dienstfaktoren: Dict[str, DefaultDict[date, float]] = {} self.historische_dienste: List[Tuple[date, str, Dienst]] = [] def get_dienst(self, kuerzel: str) -> Optional[Dienst]: """Gibt das Dienst-Objekt für ein Kürzel zurück""" for dienst in self.dienste: if dienst.kuerzel == kuerzel: return dienst return None def add_dienst(self, kuerzel: str, name: str, personen_anzahl: int = 1) -> Dienst: """Fügt einen neuen Dienst hinzu""" dienst = Dienst(kuerzel, name, personen_anzahl) self.dienste.append(dienst) return dienst def print_dienste_info(self) -> None: """Druckt Informationen über alle konfigurierten Dienste""" print("Konfigurierte Dienste:") for dienst in self.dienste: print(f" {dienst}") def lade_eingabe_csv(self, datei: str) -> None: """Lädt die eingabe.csv mit Terminen und Präferenzen""" self.eltern, self.planungszeitraum, self.benoetigte_dienste, self.verfügbarkeit, self.präferenzen = \ EingabeParser.parse_eingabe_csv(datei, self.get_dienst) def lade_eltern_csv(self, datei: str) -> None: """Lädt die eltern.csv mit Dienstfaktoren""" self.dienstfaktoren = EingabeParser.parse_eltern_csv(datei) def lade_vorherige_ausgaben_csv(self, datei: str) -> None: """Lädt vorherige-ausgaben.csv für Fairness-Constraints""" self.historische_dienste = \ EingabeParser.parse_vorherige_ausgaben_csv(datei, self.eltern, self.dienste) def berechne_dienstfaktor_an_datum(self, eltern: str, datum: date) -> float: """Berechnet den Dienstfaktor eines Elternteils an einem bestimmten Datum""" if eltern not in self.dienstfaktoren: return 0 return self.dienstfaktoren[eltern][datum] # DefaultDict gibt 0 zurück für unbekannte Tage def berechne_faire_zielverteilung_global(self) -> DefaultDict[str, DefaultDict[Dienst, float]]: """Berechnet die faire Zielanzahl von Diensten für den Planungszeitraum basierend auf globaler Fairness (Historie + aktueller Planungszeitraum). Gibt die Ziel-Dienstanzahl für den aktuellen Planungszeitraum zurück, korrigiert um bereits geleistete Dienste. Kann negativ sein, wenn bereits mehr Dienste geleistet wurden als fair wäre.""" ziel_dienste: DefaultDict[str, DefaultDict[Dienst, float]] = \ defaultdict(lambda: defaultdict(float)) print("\nBerechne faire Zielverteilung basierend auf historischen Daten...") # Historische Dienste nach Datum gruppieren historische_tage = set(datum for datum, _, _ in self.historische_dienste) if self.historische_dienste else set() print(f" Analysiere {len(historische_tage)} historische Tage mit {len(self.historische_dienste)} Diensten") for dienst in self.dienste: print(f" Verarbeite Dienst {dienst.kuerzel}...") # 1. HISTORISCHE PERIODE: Faire Umverteilung der tatsächlich geleisteten Dienste historische_dienste_dieses_typs = [ (datum, eltern) for datum, eltern, d in self.historische_dienste if d == dienst ] print(f" Gefundene historische {dienst.kuerzel}-Dienste: {len(historische_dienste_dieses_typs)}") # Gruppiere nach Datum dienste_pro_tag = defaultdict(list) for datum, eltern in historische_dienste_dieses_typs: dienste_pro_tag[datum].append(eltern) # Für jeden historischen Tag faire Umverteilung berechnen for tag, geleistete_eltern in dienste_pro_tag.items(): anzahl_dienste = len(geleistete_eltern) # Anzahl Dienste an diesem Tag # Dienstfaktoren aller Eltern für diesen historischen Tag berechnen gesamt_dienstfaktor_tag = 0 for eltern in self.eltern: gesamt_dienstfaktor_tag += self.dienstfaktoren[eltern][tag] # Faire Umverteilung der an diesem Tag geleisteten Dienste if gesamt_dienstfaktor_tag > 0: for eltern in self.eltern: if self.dienstfaktoren[eltern][tag] > 0: anteil = self.dienstfaktoren[eltern][tag] / gesamt_dienstfaktor_tag faire_zuteilung = anteil * anzahl_dienste ziel_dienste[eltern][dienst] += faire_zuteilung if faire_zuteilung > 0.01: # Debug nur für relevante Werte print(f" {tag}: {eltern} Faktor={self.dienstfaktoren[eltern][tag]} " f"-> {faire_zuteilung:.2f} von {anzahl_dienste} Diensten") # 2. AKTUELLER PLANUNGSZEITRAUM: Faire Verteilung der benötigten Dienste (tageweise wie bei historischen Diensten) benoetigte_dienste_planungszeitraum = 0 # Für jeden Tag im aktuellen Planungszeitraum faire Umverteilung berechnen for tag in self.planungszeitraum: # Prüfe ob an diesem Tag der Dienst benötigt wird if dienst not in self.benoetigte_dienste.get(tag, []): continue benoetigte_dienste_planungszeitraum += dienst.personen_anzahl # Dienstfaktoren aller Eltern für diesen Tag berechnen dienstfaktoren = {} gesamt_dienstfaktor_tag = 0 for eltern in self.eltern: faktor = self.dienstfaktoren[eltern][tag] dienstfaktoren[eltern] = faktor gesamt_dienstfaktor_tag += faktor # Faire Umverteilung der an diesem Tag benötigten Dienste if gesamt_dienstfaktor_tag > 0: for eltern in self.eltern: anteil = dienstfaktoren[eltern] / gesamt_dienstfaktor_tag faire_zuteilung = anteil * dienst.personen_anzahl ziel_dienste[eltern][dienst] += faire_zuteilung # 3. ABZUG DER BEREITS GELEISTETEN DIENSTE # Ziehe die tatsächlich geleisteten Dienste ab, um das Ziel für den Planungszeitraum zu erhalten for eltern in self.eltern: # Berechne vorherige Dienste on-the-fly aus historischen Diensten vorherige_anzahl = sum( 1 for _, hist_eltern, hist_dienst in self.historische_dienste if hist_eltern == eltern and hist_dienst == dienst ) ziel_dienste[eltern][dienst] -= vorherige_anzahl return ziel_dienste def berechne_faire_zielverteilung_lokal(self) -> DefaultDict[str, DefaultDict[Dienst, float]]: """Berechnet die lokale faire Zielanzahl von Diensten pro Eltern-Dienst-Kombination basierend auf Dienstfaktoren und benötigten Diensten im aktuellen Planungszeitraum""" ziel_dienste_lokal: DefaultDict[str, DefaultDict[Dienst, float]] = \ defaultdict(lambda: defaultdict(float)) print("\nBerechne lokale faire Zielverteilung für aktuellen Planungszeitraum...") # Gesamtdienstfaktor für aktuellen Planungszeitraum berechnen summe_dienstfaktor_planungszeitraum_alle_eltern = sum( sum(self.dienstfaktoren[e][tag] for tag in self.planungszeitraum) for e in self.eltern ) if summe_dienstfaktor_planungszeitraum_alle_eltern == 0: print(" WARNUNG: Gesamtdienstfaktor ist 0, keine lokale Zielverteilung möglich") return ziel_dienste_lokal # Für jeden Dienst die lokale faire Verteilung berechnen for dienst in self.dienste: # Anzahl benötigter Dienste im aktuellen Planungszeitraum benoetigte_dienste_planungszeitraum = sum( 1 for tag in self.planungszeitraum if dienst in self.benoetigte_dienste.get(tag, []) ) # Multipliziere mit Anzahl benötigter Personen pro Dienst benoetigte_dienste_planungszeitraum *= dienst.personen_anzahl if benoetigte_dienste_planungszeitraum > 0: print(f" {dienst.kuerzel}: {benoetigte_dienste_planungszeitraum} Dienste benötigt") for eltern in self.eltern: # Dienstfaktor für diesen Elternteil im aktuellen Planungszeitraum summe_dienstfaktor_planungszeitraum = sum( self.dienstfaktoren[eltern][tag] for tag in self.planungszeitraum ) if summe_dienstfaktor_planungszeitraum > 0: anteil = summe_dienstfaktor_planungszeitraum / summe_dienstfaktor_planungszeitraum_alle_eltern faire_zuteilung = anteil * benoetigte_dienste_planungszeitraum ziel_dienste_lokal[eltern][dienst] = faire_zuteilung return ziel_dienste_lokal def _erstelle_entscheidungsvariablen(self) -> Dict[Tuple[str, date, Dienst], pulp.LpVariable]: """Erstellt die binären Entscheidungsvariablen x[eltern, tag, dienst]""" x: Dict[Tuple[str, date, Dienst], pulp.LpVariable] = {} for eltern in self.eltern: for tag in self.planungszeitraum: for dienst in self.dienste: if dienst in self.benoetigte_dienste.get(tag, []): x[eltern, tag, dienst] = pulp.LpVariable( f"x_{eltern.replace(' ', '_')}_{tag}_{dienst.kuerzel}", cat='Binary' ) return x def _add_constraint_ein_dienst_pro_woche( self, prob: pulp.LpProblem, x: Dict[Tuple[str, date, Dienst], pulp.LpVariable] ) -> None: """C1: Je Eltern und Dienst nur einmal die Woche (Woche = Montag bis Sonntag)""" erster_tag = self.planungszeitraum[0] # weekday(): 0=Montag, 6=Sonntag # Finde Montag am oder vor dem ersten Planungstag (für historische Dienste) woche_start = erster_tag - timedelta(days=erster_tag.weekday()) woche_nr = 0 letzter_tag = self.planungszeitraum[-1] while woche_start <= letzter_tag: woche_ende = woche_start + timedelta(days=6) # Sonntag for eltern in self.eltern: for dienst in self.dienste: woche_vars = [] # Zähle historische Dienste in dieser Woche (VOR dem Planungszeitraum) historische_dienste_in_woche = 0 if woche_start < erster_tag: for hist_datum, hist_eltern, hist_dienst in self.historische_dienste: if (hist_eltern == eltern and hist_dienst == dienst and woche_start <= hist_datum < erster_tag): historische_dienste_in_woche += 1 # Sammle Variablen für Planungszeitraum in dieser Woche for tag in self.planungszeitraum: if woche_start <= tag <= woche_ende: if (eltern, tag, dienst) in x: woche_vars.append(x[eltern, tag, dienst]) # Constraint: Historische + geplante Dienste <= 1 if woche_vars: prob += pulp.lpSum(woche_vars) <= 1 - historische_dienste_in_woche, \ f"C1_{eltern.replace(' ', '_')}_{dienst.kuerzel}_w{woche_nr}" woche_start += timedelta(days=7) woche_nr += 1 def _add_constraint_ein_dienst_pro_tag( self, prob: pulp.LpProblem, x: Dict[Tuple[str, date, Dienst], pulp.LpVariable] ) -> None: """C2: Je Eltern nur einen Dienst am Tag""" for eltern in self.eltern: for tag in self.planungszeitraum: tag_vars = [] for dienst in self.dienste: if (eltern, tag, dienst) in x: tag_vars.append(x[eltern, tag, dienst]) if tag_vars: prob += pulp.lpSum(tag_vars) <= 1, f"C2_{eltern.replace(' ', '_')}_{tag}" def _add_constraint_verfuegbarkeit( self, prob: pulp.LpProblem, x: Dict[Tuple[str, date, Dienst], pulp.LpVariable] ) -> None: """C3: Dienste nur verfügbaren Eltern zuteilen""" for eltern in self.eltern: for tag in self.planungszeitraum: if not self.verfügbarkeit.get((eltern, tag), True): for dienst in self.dienste: if (eltern, tag, dienst) in x: prob += x[eltern, tag, dienst] == 0, \ f"C3_{eltern.replace(' ', '_')}_{tag}_{dienst.kuerzel}" def _add_constraint_dienst_bedarf( self, prob: pulp.LpProblem, x: Dict[Tuple[str, date, Dienst], pulp.LpVariable] ) -> None: """C4: Alle benötigten Dienste müssen zugeteilt werden""" for tag in self.planungszeitraum: for dienst in self.benoetigte_dienste.get(tag, []): dienst_vars = [] for eltern in self.eltern: if (eltern, tag, dienst) in x: # Prüfe ob Eltern verfügbar if self.verfügbarkeit.get((eltern, tag), True): dienst_vars.append(x[eltern, tag, dienst]) if dienst_vars: # Anzahl benötigter Personen pro Dienst (aus Dienst-Objekt) benoetigte_personen = dienst.personen_anzahl prob += pulp.lpSum(dienst_vars) == benoetigte_personen, \ f"Bedarf_{tag}_{dienst.kuerzel}" def _add_fairness_constraints( self, prob: pulp.LpProblem, x: Dict[Tuple[str, date, Dienst], pulp.LpVariable], ziel_dienste: DefaultDict[str, DefaultDict[Dienst, float]], constraint_prefix: str ) -> Dict: """Erstellt Fairness-Variablen und fügt Fairness-Constraints hinzu Args: prob: Das LP-Problem x: Die Entscheidungsvariablen ziel_dienste: Die Zielverteilung der Dienste constraint_prefix: Präfix für Constraint-Namen ('lokal' oder 'global') Returns: Dictionary mit Fairness-Abweichungsvariablen """ # Hilfsvariablen für Fairness-Abweichungen erstellen fairness_abweichung = {} for eltern in self.eltern: for dienst in self.dienste: fairness_abweichung[eltern, dienst] = pulp.LpVariable( f"fair_{constraint_prefix}_{eltern.replace(' ', '_')}_{dienst.kuerzel}", lowBound=0) # Fairness-Constraints hinzufügen for eltern in self.eltern: for dienst in self.dienste: # Tatsächliche Dienste im aktuellen Planungszeitraum zugeteilte_dienste_planungszeitraum = pulp.lpSum( x[eltern, tag, dienst] for tag in self.planungszeitraum if (eltern, tag, dienst) in x ) # Ziel für diese Fairness-Variante ziel = ziel_dienste[eltern][dienst] prob += (zugeteilte_dienste_planungszeitraum - ziel <= fairness_abweichung[eltern, dienst]) prob += (ziel - zugeteilte_dienste_planungszeitraum <= fairness_abweichung[eltern, dienst]) return fairness_abweichung def _add_constraint_gesamtfairness( self, prob: pulp.LpProblem, x: Dict[Tuple[str, date, Dienst], pulp.LpVariable], ziel_dienste: DefaultDict[str, DefaultDict[Dienst, float]], constraint_prefix: str ) -> Dict: """F3: Dienstübergreifende Fairness - verhindert Häufung bei einzelnen Eltern Berechnet die Abweichung der Gesamtdienstanzahl (über alle Diensttypen) vom fairen Gesamtziel. Dies verhindert, dass einzelne Eltern über alle Diensttypen hinweg überproportional viele Dienste bekommen. Args: prob: Das LP-Problem x: Die Entscheidungsvariablen ziel_dienste: Die Zielverteilung (global oder lokal) constraint_prefix: Präfix für Constraint-Namen ('lokal' oder 'global') Returns: Dictionary mit Gesamt-Fairness-Abweichungsvariablen """ fairness_abweichung_gesamt = {} for eltern in self.eltern: fairness_abweichung_gesamt[eltern] = pulp.LpVariable( f"fair_gesamt_{constraint_prefix}_{eltern.replace(' ', '_')}", lowBound=0) # Tatsächliche Gesamtdienste für diesen Elternteil tatsaechliche_dienste_gesamt = pulp.lpSum( x[eltern, tag, dienst] for tag in self.planungszeitraum for dienst in self.dienste if (eltern, tag, dienst) in x ) # Ziel-Gesamtdienste für diesen Elternteil (Summe über alle Dienste) ziel_gesamt = sum(ziel_dienste[eltern][dienst] for dienst in self.dienste) # Fairness-Constraints prob += (tatsaechliche_dienste_gesamt - ziel_gesamt <= fairness_abweichung_gesamt[eltern]) prob += (ziel_gesamt - tatsaechliche_dienste_gesamt <= fairness_abweichung_gesamt[eltern]) return fairness_abweichung_gesamt def _erstelle_zielfunktion( self, prob: pulp.LpProblem, x: Dict[Tuple[str, date, Dienst], pulp.LpVariable], fairness_abweichung_lokal: Dict, fairness_abweichung_global: Dict, fairness_abweichung_gesamt_global: Dict, fairness_abweichung_gesamt_lokal: Dict ) -> None: """Erstellt die Zielfunktion mit Fairness und Präferenzen""" objective_terms = [] # Fairness-Gewichtung gewicht_global = 40 gewicht_lokal = 60 gewicht_f1 = gewicht_global gewicht_f2 = gewicht_lokal gewicht_f3_global = 0.25 * gewicht_global gewicht_f3_lokal = 0.25 * gewicht_lokal # Fairness-Terme zur Zielfunktion hinzufügen for eltern in self.eltern: for dienst in self.dienste: objective_terms.append(gewicht_f1 * fairness_abweichung_global[eltern, dienst]) objective_terms.append(gewicht_f2 * fairness_abweichung_lokal[eltern, dienst]) # F3: Gesamtfairness (dienstübergreifend) - global und lokal objective_terms.append(gewicht_f3_global * fairness_abweichung_gesamt_global[eltern]) objective_terms.append(gewicht_f3_lokal * fairness_abweichung_gesamt_lokal[eltern]) # P1: Bevorzugte Dienste (positiv belohnen) for (eltern, tag, dienst), präf in self.präferenzen.items(): if (eltern, tag, dienst) in x and präf == 1: # bevorzugt objective_terms.append(-5 * x[eltern, tag, dienst]) # P2: Abgelehnte Dienste (bestrafen) for (eltern, tag, dienst), präf in self.präferenzen.items(): if (eltern, tag, dienst) in x and präf == -1: # abgelehnt objective_terms.append(25 * x[eltern, tag, dienst]) # Zielfunktion setzen if objective_terms: prob += pulp.lpSum(objective_terms) else: # Fallback: Minimiere Gesamtanzahl Dienste prob += pulp.lpSum([var for var in x.values()]) print(f"Verwende Gewichtung: F1 (global) = {gewicht_f1}, F2 (lokal) = {gewicht_f2}, " f"F3_global = {gewicht_f3_global}, F3_lokal = {gewicht_f3_lokal}") def erstelle_optimierungsmodell(self) -> Tuple[ pulp.LpProblem, Dict[Tuple[str, date, Dienst], pulp.LpVariable], DefaultDict[str, DefaultDict[Dienst, float]], DefaultDict[str, DefaultDict[Dienst, float]] ]: """Erstellt das PuLP Optimierungsmodell Returns: Tuple mit (prob, x, ziel_dienste_lokal, ziel_dienste_global) """ print("Erstelle Optimierungsmodell...") # Debugging: Verfügbarkeit prüfen print("\nDebug: Verfügbarkeit analysieren...") for tag in self.planungszeitraum[:5]: # Erste 5 Tage verfügbare = [e for e in self.eltern if self.verfügbarkeit.get((e, tag), True)] benötigte = self.benoetigte_dienste.get(tag, []) print(f" {tag}: Benötigt {len(benötigte)} Dienste {benötigte}, verfügbar: {verfügbare}") # LP Problem erstellen prob = pulp.LpProblem("Elterndienstplaner", pulp.LpMinimize) # Entscheidungsvariablen erstellen x = self._erstelle_entscheidungsvariablen() # Grundlegende Constraints hinzufügen self._add_constraint_ein_dienst_pro_woche(prob, x) self._add_constraint_ein_dienst_pro_tag(prob, x) self._add_constraint_verfuegbarkeit(prob, x) self._add_constraint_dienst_bedarf(prob, x) # Fairness-Constraints ziel_dienste_global = self.berechne_faire_zielverteilung_global() ziel_dienste_lokal = self.berechne_faire_zielverteilung_lokal() # F2: Lokale Fairness-Constraints fairness_abweichung_lokal = self._add_fairness_constraints( prob, x, ziel_dienste_lokal, "lokal" ) # F1: Globale Fairness-Constraints fairness_abweichung_global = self._add_fairness_constraints( prob, x, ziel_dienste_global, "global" ) # F3: Dienstübergreifende Fairness - Global fairness_abweichung_gesamt_global = self._add_constraint_gesamtfairness( prob, x, ziel_dienste_global, "global" ) # F3: Dienstübergreifende Fairness - Lokal fairness_abweichung_gesamt_lokal = self._add_constraint_gesamtfairness( prob, x, ziel_dienste_lokal, "lokal" ) # Zielfunktion erstellen self._erstelle_zielfunktion(prob, x, fairness_abweichung_lokal, fairness_abweichung_global, fairness_abweichung_gesamt_global, fairness_abweichung_gesamt_lokal) print(f"Modell erstellt mit {len(x)} Variablen und {len(prob.constraints)} Constraints") return prob, x, ziel_dienste_lokal, ziel_dienste_global def löse_optimierung(self, prob: pulp.LpProblem, x: Dict[Tuple[str, date, Dienst], pulp.LpVariable]) -> Optional[Dict[date, Dict[Dienst, List[str]]]]: """Löst das Optimierungsproblem""" print("Löse Optimierungsproblem...") # Solver wählen (verfügbare Solver testen) solver = None try: print("Versuche CBC Solver...") solver = pulp.PULP_CBC_CMD(msg=0, timeLimit=10) # Standard CBC Solver except: try: print("Versuche GLPK Solver...") solver = pulp.GLPK_CMD(msg=0) # GLPK falls verfügbar except: print("Kein spezifizierter Solver verfügbar, verwende Standard.") solver = None # Default Solver prob.solve(solver) status = pulp.LpStatus[prob.status] print(f"Optimierung abgeschlossen: {status}") if prob.status != pulp.LpStatusOptimal: print("WARNUNG: Keine optimale Lösung gefunden!") return None # Lösung extrahieren lösung: Dict[date, Dict[Dienst, List[str]]] = {} for (eltern, tag, dienst), var in x.items(): if var.varValue and var.varValue > 0.5: # Binary variable ist 1 if tag not in lösung: lösung[tag] = {} if dienst not in lösung[tag]: lösung[tag][dienst] = [] lösung[tag][dienst].append(eltern) return lösung def schreibe_ausgabe_csv(self, datei: str, lösung: Dict[date, Dict[Dienst, List[str]]]) -> None: """Schreibt die Lösung in die ausgabe.csv""" AusgabeWriter.schreibe_ausgabe_csv(datei, lösung, self.planungszeitraum, self.dienste) def drucke_statistiken(self, lösung: Dict[date, Dict[Dienst, List[str]]]) -> None: """Druckt Statistiken zur Lösung""" print("\n" + "="*50) print("STATISTIKEN") print("="*50) # Dienste pro Eltern zählen dienste_pro_eltern = defaultdict(lambda: defaultdict(int)) for tag, tag_dienste in lösung.items(): for dienst, eltern_liste in tag_dienste.items(): for eltern in eltern_liste: dienste_pro_eltern[eltern][dienst] += 1 # Gesamtübersicht print("\nDienste pro Eltern:") for eltern in sorted(self.eltern): gesamt = sum(dienste_pro_eltern[eltern].values()) dienste_detail = ', '.join(f"{dienst.kuerzel}:{dienste_pro_eltern[eltern][dienst]}" for dienst in self.dienste if dienste_pro_eltern[eltern][dienst] > 0) print(f" {eltern:15} {gesamt:3d} ({dienste_detail})") # Dienstfaktor-Analyse print(f"\nDienstfaktoren im Planungszeitraum:") for eltern in sorted(self.eltern): faktor_summe = sum(self.dienstfaktoren[eltern][tag] for tag in self.planungszeitraum) print(f" {eltern:15} {faktor_summe:.1f}") def visualisiere_praeferenz_verletzungen( self, lösung: Dict[date, Dict[Dienst, List[str]]] ) -> None: """Visualisiert verletzte Präferenzen als Tabelle Args: lösung: Die tatsächliche Lösung nach Optimierung """ print("\n" + "="*110) print("PRÄFERENZ-VERLETZUNGEN") print("="*110) # Sammle alle zugeteilten Dienste pro Eltern zugeteilte_dienste = defaultdict(lambda: defaultdict(list)) # eltern -> dienst -> [dates] for tag, tag_dienste in lösung.items(): for dienst, eltern_liste in tag_dienste.items(): for eltern in eltern_liste: zugeteilte_dienste[eltern][dienst].append(tag) # Sammle Präferenzen strukturiert # praeferenzen_pro_eltern_dienst[eltern][dienst] = {datum: präf_wert} praeferenzen_pro_eltern_dienst = defaultdict(lambda: defaultdict(dict)) for (eltern, tag, dienst), präf in self.präferenzen.items(): praeferenzen_pro_eltern_dienst[eltern][dienst][tag] = präf # Berechne Verletzungen verletzungen = defaultdict(lambda: defaultdict(lambda: {'negativ': 0, 'positiv_nicht_erfuellt': 0})) for eltern in sorted(self.eltern): for dienst in self.dienste: zugeteilte_tage = zugeteilte_dienste[eltern][dienst] praeferenzen_dienst = praeferenzen_pro_eltern_dienst[eltern][dienst] if not zugeteilte_tage: continue # Keine Dienste zugeteilt # a) Negative Präferenzen die verletzt wurden for tag in zugeteilte_tage: if tag in praeferenzen_dienst and praeferenzen_dienst[tag] == -1: verletzungen[eltern][dienst]['negativ'] += 1 # b) Positive Präferenzen nicht erfüllt (Dienst an nicht-präferiertem Tag) # Sammle alle Tage mit positiver Präferenz für diesen Dienst positive_praef_tage = {tag for tag, präf in praeferenzen_dienst.items() if präf == 1} if positive_praef_tage: # Es gibt positive Präferenzen # Prüfe ob ALLE zugeteilten Dienste an nicht-präferierten Tagen sind for tag in zugeteilte_tage: if tag not in positive_praef_tage: # Dienst wurde an nicht-präferiertem Tag zugeteilt verletzungen[eltern][dienst]['positiv_nicht_erfuellt'] += 1 # Tabelle ausgeben print(f"\n{'Eltern':<20} ", end='') for dienst in self.dienste: print(f"{dienst.kuerzel:>12}", end='') print() print(f"{'':20} ", end='') for dienst in self.dienste: print(f"{'neg, pos':>12}", end='') print() print("-" * (20 + 12 * len(self.dienste))) gesamt_negativ = defaultdict(int) gesamt_positiv = defaultdict(int) for eltern in sorted(self.eltern): print(f"{eltern:<20} ", end='') for dienst in self.dienste: neg = verletzungen[eltern][dienst]['negativ'] pos = verletzungen[eltern][dienst]['positiv_nicht_erfuellt'] gesamt_negativ[dienst] += neg gesamt_positiv[dienst] += pos # Farbcodierung farbe = "" reset = "" if neg > 0 or pos > 0: farbe = "\033[91m" if neg > 0 else "\033[93m" # Rot für negativ, Gelb für positiv reset = "\033[0m" print(f"{farbe}{neg:>3}, {pos:>3}{reset:>6}", end='') print() # Summenzeile print("-" * (20 + 12 * len(self.dienste))) print(f"{'SUMME':<20} ", end='') for dienst in self.dienste: neg = gesamt_negativ[dienst] pos = gesamt_positiv[dienst] farbe = "" reset = "" if neg > 0 or pos > 0: farbe = "\033[91m" if neg > 0 else "\033[93m" reset = "\033[0m" print(f"{farbe}{neg:>3}, {pos:>3}{reset:>6}", end='') print() print("\nLegende:") print(" neg = Anzahl negativer Präferenzen (abgelehnte Tage), die verletzt wurden") print(" pos = Anzahl Dienste an nicht-präferierten Tagen (obwohl präferierte Tage angegeben waren)") print(" \033[91mRot\033[0m = Negative Präferenz verletzt") print(" \033[93mGelb\033[0m = Positive Präferenz nicht erfüllt") def visualisiere_verteilungen( self, lösung: Dict[date, Dict[Dienst, List[str]]], ziel_lokal: DefaultDict[str, DefaultDict[Dienst, float]], ziel_global: DefaultDict[str, DefaultDict[Dienst, float]] ) -> None: """Visualisiert die Verteilungen als Tabelle zum Vergleich Args: lösung: Die tatsächliche Lösung nach Optimierung ziel_lokal: Lokale Zielverteilung (nur aktueller Planungszeitraum) ziel_global: Globale Zielverteilung (inkl. Historie) """ # Tatsächliche Dienste zählen tatsaechlich = defaultdict(lambda: defaultdict(int)) for tag_dienste in lösung.values(): for dienst, eltern_liste in tag_dienste.items(): for eltern in eltern_liste: tatsaechlich[eltern][dienst] += 1 print("\n" + "="*110) print("VERTEILUNGSVERGLEICH: SOLL vs. IST") print("="*110) for dienst in self.dienste: print(f"\n{dienst.name} ({dienst.kuerzel}):") print(f"{'Eltern':<20} {'Ziel Global':>12} {'Ziel Lokal':>12} {'Tatsächlich':>12} " f"{'Δ Global':>12} {'Δ Lokal':>12}") print("-" * 110) for eltern in sorted(self.eltern): z_global = ziel_global[eltern][dienst] z_lokal = ziel_lokal[eltern][dienst] ist = tatsaechlich[eltern][dienst] delta_global = ist - z_global delta_lokal = ist - z_lokal # Farbcodierung für Abweichungen (ANSI-Codes) farbe_global = "" farbe_lokal = "" reset = "" if abs(delta_global) > 0.5: farbe_global = "\033[93m" if abs(delta_global) <= 1.0 else "\033[91m" # Gelb oder Rot reset = "\033[0m" if abs(delta_lokal) > 0.5: farbe_lokal = "\033[93m" if abs(delta_lokal) <= 1.0 else "\033[91m" reset = "\033[0m" print(f"{eltern:<20} {z_global:>12.2f} {z_lokal:>12.2f} {ist:>12} " f"{farbe_global}{delta_global:>+12.2f}{reset} {farbe_lokal}{delta_lokal:>+12.2f}{reset}") # Summen summe_z_global = sum(ziel_global[e][dienst] for e in self.eltern) summe_z_lokal = sum(ziel_lokal[e][dienst] for e in self.eltern) summe_ist = sum(tatsaechlich[e][dienst] for e in self.eltern) print("-" * 110) print(f"{'SUMME':<20} {summe_z_global:>12.2f} {summe_z_lokal:>12.2f} {summe_ist:>12} " f"{summe_ist - summe_z_global:>+12.2f} {summe_ist - summe_z_lokal:>+12.2f}") # Gesamtstatistik print("\n" + "="*110) print("ZUSAMMENFASSUNG") print("="*110) # Maximale Abweichungen finden max_abw_global = 0 max_abw_lokal = 0 for eltern in self.eltern: for dienst in self.dienste: ist = tatsaechlich[eltern][dienst] max_abw_global = max(max_abw_global, abs(ist - ziel_global[eltern][dienst])) max_abw_lokal = max(max_abw_lokal, abs(ist - ziel_lokal[eltern][dienst])) print(f"Maximale Abweichung von Global-Ziel: {max_abw_global:.2f} Dienste") print(f"Maximale Abweichung von Lokal-Ziel: {max_abw_lokal:.2f} Dienste") print("\nLegende: Δ = Tatsächlich - Ziel (positiv = mehr als Ziel, negativ = weniger als Ziel)") def main() -> None: if len(sys.argv) < 4: print("Usage: ./elterndienstplaner.py []") sys.exit(1) eingabe_datei = sys.argv[1] eltern_datei = sys.argv[2] ausgabe_datei = sys.argv[3] vorherige_datei = sys.argv[4] if len(sys.argv) > 4 else None print("Elterndienstplaner gestartet") print("="*50) try: planer = Elterndienstplaner() # Daten laden planer.lade_eingabe_csv(eingabe_datei) planer.lade_eltern_csv(eltern_datei) if vorherige_datei: planer.lade_vorherige_ausgaben_csv(vorherige_datei) # Optimierung prob, x, ziel_lokal, ziel_global = planer.erstelle_optimierungsmodell() lösung = planer.löse_optimierung(prob, x) if lösung is not None: # Ergebnisse ausgeben planer.schreibe_ausgabe_csv(ausgabe_datei, lösung) planer.drucke_statistiken(lösung) # Visualisierung der Verteilungen planer.visualisiere_verteilungen(lösung, ziel_lokal, ziel_global) # Visualisierung der Präferenz-Verletzungen planer.visualisiere_praeferenz_verletzungen(lösung) print("\n✓ Planung erfolgreich abgeschlossen!") else: print("\n✗ Fehler: Keine gültige Lösung gefunden!") sys.exit(1) except Exception as e: print(f"\n✗ Fehler: {e}") sys.exit(1) if __name__ == "__main__": main()