#!/usr/bin/env python3 """ Elterndienstplaner - Optimale Zuteilung von Elterndiensten Autor: Automatisch generiert Datum: Dezember 2025 """ import sys import pulp from datetime import timedelta, date from collections import defaultdict from typing import Dict, List, Tuple, DefaultDict, Optional from csv_io import EingabeParser, AusgabeWriter class Dienst: """Repräsentiert einen Diensttyp mit allen seinen Eigenschaften""" def __init__(self, kuerzel: str, name: str, personen_anzahl: int = 1) -> None: self.kuerzel: str = kuerzel self.name: str = name self.personen_anzahl: int = personen_anzahl def __str__(self) -> str: return f"{self.kuerzel} ({self.name}): {self.personen_anzahl} Person(en)" def __repr__(self) -> str: return f"Dienst('{self.kuerzel}', '{self.name}', {self.personen_anzahl})" def braucht_mehrere_personen(self) -> bool: """Gibt True zurück, wenn mehr als eine Person benötigt wird""" return self.personen_anzahl > 1 class Elterndienstplaner: def __init__(self) -> None: # Dienste als Liste definieren self.dienste: List[Dienst] = [ Dienst('F', 'Frühstücksdienst', 1), Dienst('P', 'Putznotdienst', 1), Dienst('E', 'Essensausgabenotdienst', 1), Dienst('K', 'Kochen', 1), Dienst('A', 'Elternabend', 2) ] # Datenstrukturen self.tage: List[date] = [] self.eltern: List[str] = [] self.benoetigte_dienste: Dict[date, List[Dienst]] = {} self.verfügbarkeit: Dict[Tuple[str, date], bool] = {} self.präferenzen: Dict[Tuple[str, date, Dienst], int] = {} self.dienstfaktoren: Dict[str, Dict[date, float]] = {} self.alle_zeitraeume: Dict[str, List[Tuple[date, date, float]]] = {} self.vorherige_dienste: DefaultDict[str, DefaultDict[Dienst, int]] = \ defaultdict(lambda: defaultdict(int)) self.historische_dienste: List[Tuple[date, str, Dienst]] = [] def get_dienst(self, kuerzel: str) -> Optional[Dienst]: """Gibt das Dienst-Objekt für ein Kürzel zurück""" for dienst in self.dienste: if dienst.kuerzel == kuerzel: return dienst return None def add_dienst(self, kuerzel: str, name: str, personen_anzahl: int = 1) -> Dienst: """Fügt einen neuen Dienst hinzu""" dienst = Dienst(kuerzel, name, personen_anzahl) self.dienste.append(dienst) return dienst def print_dienste_info(self) -> None: """Druckt Informationen über alle konfigurierten Dienste""" print("Konfigurierte Dienste:") for dienst in self.dienste: print(f" {dienst}") def lade_eingabe_csv(self, datei: str) -> None: """Lädt die eingabe.csv mit Terminen und Präferenzen""" self.eltern, self.tage, self.benoetigte_dienste, self.verfügbarkeit, self.präferenzen = \ EingabeParser.parse_eingabe_csv(datei, self.get_dienst) def lade_eltern_csv(self, datei: str) -> None: """Lädt die eltern.csv mit Dienstfaktoren""" self.dienstfaktoren, self.alle_zeitraeume = \ EingabeParser.parse_eltern_csv(datei, self.tage) def lade_vorherige_ausgaben_csv(self, datei: str) -> None: """Lädt vorherige-ausgaben.csv für Fairness-Constraints""" self.vorherige_dienste, self.historische_dienste = \ EingabeParser.parse_vorherige_ausgaben_csv(datei, self.eltern, self.dienste) def berechne_dienstfaktor_an_datum(self, eltern: str, datum: date) -> float: """Berechnet den Dienstfaktor eines Elternteils an einem bestimmten Datum""" if eltern not in self.alle_zeitraeume: return 0 for beginn, ende, faktor in self.alle_zeitraeume[eltern]: if beginn <= datum <= ende: return faktor return 0 def berechne_faire_zielverteilung_global(self) -> DefaultDict[str, DefaultDict[Dienst, float]]: """Berechnet die faire Zielanzahl von Diensten pro Eltern-Dienst-Kombination basierend auf tatsächlich geleisteten historischen Diensten und deren fairer Umverteilung""" ziel_dienste: DefaultDict[str, DefaultDict[Dienst, float]] = \ defaultdict(lambda: defaultdict(float)) print("\nBerechne faire Zielverteilung basierend auf historischen Daten...") # Historische Dienste nach Datum gruppieren historische_tage = set(datum for datum, _, _ in self.historische_dienste) if self.historische_dienste else set() print(f" Analysiere {len(historische_tage)} historische Tage mit {len(self.historische_dienste)} Diensten") for dienst in self.dienste: print(f" Verarbeite Dienst {dienst.kuerzel}...") # 1. HISTORISCHE PERIODE: Faire Umverteilung der tatsächlich geleisteten Dienste historische_dienste_dieses_typs = [ (datum, eltern) for datum, eltern, d in self.historische_dienste if d == dienst ] print(f" Gefundene historische {dienst.kuerzel}-Dienste: {len(historische_dienste_dieses_typs)}") # Gruppiere nach Datum dienste_pro_tag = defaultdict(list) for datum, eltern in historische_dienste_dieses_typs: dienste_pro_tag[datum].append(eltern) # Für jeden historischen Tag faire Umverteilung berechnen for hist_datum, geleistete_eltern in dienste_pro_tag.items(): anzahl_dienste = len(geleistete_eltern) # Anzahl Dienste an diesem Tag # Dienstfaktoren aller Eltern für diesen historischen Tag berechnen dienstfaktoren_tag = {} gesamt_dienstfaktor_tag = 0 for eltern in self.eltern: faktor = self.berechne_dienstfaktor_an_datum(eltern, hist_datum) dienstfaktoren_tag[eltern] = faktor gesamt_dienstfaktor_tag += faktor # Faire Umverteilung der an diesem Tag geleisteten Dienste if gesamt_dienstfaktor_tag > 0: for eltern in self.eltern: if dienstfaktoren_tag[eltern] > 0: anteil = dienstfaktoren_tag[eltern] / gesamt_dienstfaktor_tag faire_zuteilung = anteil * anzahl_dienste ziel_dienste[eltern][dienst] += faire_zuteilung if faire_zuteilung > 0.01: # Debug nur für relevante Werte print(f" {hist_datum}: {eltern} Faktor={dienstfaktoren_tag[eltern]} " f"-> {faire_zuteilung:.2f} von {anzahl_dienste} Diensten") # 2. AKTUELLER MONAT: Faire Verteilung der benötigten Dienste benoetigte_dienste_monat = sum( 1 for tag in self.tage if dienst in self.benoetigte_dienste.get(tag, []) ) # Multipliziere mit Anzahl benötigter Personen pro Dienst benoetigte_dienste_monat *= dienst.personen_anzahl if benoetigte_dienste_monat > 0: # Gesamtdienstfaktor für aktuellen Monat gesamt_dienstfaktor_monat = sum( sum(self.dienstfaktoren.get(e, {}).get(tag, 0) for tag in self.tage) for e in self.eltern ) if gesamt_dienstfaktor_monat > 0: for eltern in self.eltern: monatsfaktor = sum( self.dienstfaktoren.get(eltern, {}).get(tag, 0) for tag in self.tage ) if monatsfaktor > 0: anteil = monatsfaktor / gesamt_dienstfaktor_monat faire_zuteilung = anteil * benoetigte_dienste_monat ziel_dienste[eltern][dienst] += faire_zuteilung # Debug-Ausgabe für diesen Dienst total_historisch = sum( ziel_dienste[e][dienst] - ( sum(self.dienstfaktoren.get(e, {}).get(tag, 0) for tag in self.tage) / sum(sum(self.dienstfaktoren.get(e2, {}).get(tag, 0) for tag in self.tage) for e2 in self.eltern) * benoetigte_dienste_monat if sum(sum(self.dienstfaktoren.get(e2, {}).get(tag, 0) for tag in self.tage) for e2 in self.eltern) > 0 else 0 ) for e in self.eltern ) if len(historische_dienste_dieses_typs) > 0 else 0 print(f" {dienst.kuerzel}: Historisch faire Summe={total_historisch:.1f}, " f"Aktuell benötigt={benoetigte_dienste_monat}") # Debug-Output: Detaillierte Zielverteilung print("\n Berechnete Zielverteilung (basierend auf tatsächlichen historischen Diensten):") for eltern in sorted(self.eltern): for dienst in self.dienste: if ziel_dienste[eltern][dienst] > 0.1: # Nur relevante Werte ist = self.vorherige_dienste[eltern][dienst] ziel = ziel_dienste[eltern][dienst] print(f" {eltern} {dienst.kuerzel}: IST={ist}, FAIRE_ZIEL={ziel:.2f}, DIFF={ziel-ist:.2f}") return ziel_dienste def berechne_faire_zielverteilung_lokal(self) -> DefaultDict[str, DefaultDict[Dienst, float]]: """Berechnet die lokale faire Zielanzahl von Diensten pro Eltern-Dienst-Kombination basierend auf Dienstfaktoren und benötigten Diensten im aktuellen Planungsmonat""" ziel_dienste_lokal: DefaultDict[str, DefaultDict[Dienst, float]] = \ defaultdict(lambda: defaultdict(float)) print("\nBerechne lokale faire Zielverteilung für aktuellen Monat...") # Gesamtdienstfaktor für aktuellen Monat berechnen gesamt_dienstfaktor_monat = sum( sum(self.dienstfaktoren.get(e, {}).get(tag, 0) for tag in self.tage) for e in self.eltern ) if gesamt_dienstfaktor_monat == 0: print(" WARNUNG: Gesamtdienstfaktor ist 0, keine lokale Zielverteilung möglich") return ziel_dienste_lokal # Für jeden Dienst die lokale faire Verteilung berechnen for dienst in self.dienste: # Anzahl benötigter Dienste im aktuellen Monat benoetigte_dienste_monat = sum( 1 for tag in self.tage if dienst in self.benoetigte_dienste.get(tag, []) ) # Multipliziere mit Anzahl benötigter Personen pro Dienst benoetigte_dienste_monat *= dienst.personen_anzahl if benoetigte_dienste_monat > 0: print(f" {dienst.kuerzel}: {benoetigte_dienste_monat} Dienste benötigt") for eltern in self.eltern: # Dienstfaktor für diesen Elternteil im aktuellen Monat monatlicher_dienstfaktor = sum( self.dienstfaktoren.get(eltern, {}).get(tag, 0) for tag in self.tage ) if monatlicher_dienstfaktor > 0: anteil = monatlicher_dienstfaktor / gesamt_dienstfaktor_monat faire_zuteilung = anteil * benoetigte_dienste_monat ziel_dienste_lokal[eltern][dienst] = faire_zuteilung if faire_zuteilung > 0.1: # Debug nur für relevante Werte print(f" {eltern}: Faktor={monatlicher_dienstfaktor:.1f} " f"-> {faire_zuteilung:.2f} Dienste") return ziel_dienste_lokal def erstelle_optimierungsmodell(self) -> Tuple[pulp.LpProblem, Dict[Tuple[str, date, Dienst], pulp.LpVariable]]: """Erstellt das PuLP Optimierungsmodell""" print("Erstelle Optimierungsmodell...") # Debugging: Verfügbarkeit prüfen print("\nDebug: Verfügbarkeit analysieren...") for tag in self.tage[:5]: # Erste 5 Tage verfügbare = [e for e in self.eltern if self.verfügbarkeit.get((e, tag), True)] benötigte = self.benoetigte_dienste.get(tag, []) print(f" {tag}: Benötigt {len(benötigte)} Dienste {benötigte}, verfügbar: {verfügbare}") # LP Problem erstellen prob = pulp.LpProblem("Elterndienstplaner", pulp.LpMinimize) # Entscheidungsvariablen: x[eltern, tag, dienst] ∈ {0,1} x: Dict[Tuple[str, date, Dienst], pulp.LpVariable] = {} for eltern in self.eltern: for tag in self.tage: for dienst in self.dienste: if dienst in self.benoetigte_dienste.get(tag, []): x[eltern, tag, dienst] = pulp.LpVariable( f"x_{eltern.replace(' ', '_')}_{tag}_{dienst.kuerzel}", cat='Binary' ) # Vereinfachtes Modell: Grundlegende Constraints # C1: Je Eltern und Dienst nur einmal die Woche woche_start = self.tage[0] woche_nr = 0 while woche_start <= self.tage[-1]: woche_ende = min(woche_start + timedelta(days=6), self.tage[-1]) woche_tage = [t for t in self.tage if woche_start <= t <= woche_ende] for eltern in self.eltern: for dienst in self.dienste: woche_vars = [] for tag in woche_tage: if (eltern, tag, dienst) in x: woche_vars.append(x[eltern, tag, dienst]) if woche_vars: prob += pulp.lpSum(woche_vars) <= 1, f"C1_{eltern.replace(' ', '_')}_{dienst.kuerzel}_w{woche_nr}" woche_start += timedelta(days=7) woche_nr += 1 # C2: Je Eltern nur einen Dienst am Tag for eltern in self.eltern: for tag in self.tage: tag_vars = [] for dienst in self.dienste: if (eltern, tag, dienst) in x: tag_vars.append(x[eltern, tag, dienst]) if tag_vars: prob += pulp.lpSum(tag_vars) <= 1, f"C2_{eltern.replace(' ', '_')}_{tag}" # C3: Dienste nur verfügbaren Eltern zuteilen for eltern in self.eltern: for tag in self.tage: if not self.verfügbarkeit.get((eltern, tag), True): for dienst in self.dienste: if (eltern, tag, dienst) in x: prob += x[eltern, tag, dienst] == 0, f"C3_{eltern.replace(' ', '_')}_{tag}_{dienst.kuerzel}" # Alle benötigten Dienste müssen zugeteilt werden (flexibel) for tag in self.tage: for dienst in self.benoetigte_dienste.get(tag, []): dienst_vars = [] verfuegbare_eltern = 0 for eltern in self.eltern: if (eltern, tag, dienst) in x: # Prüfe ob Eltern verfügbar if self.verfügbarkeit.get((eltern, tag), True): dienst_vars.append(x[eltern, tag, dienst]) verfuegbare_eltern += 1 if dienst_vars: # Anzahl benötigter Personen pro Dienst (aus Dienst-Objekt) benoetigte_personen = dienst.personen_anzahl prob += pulp.lpSum(dienst_vars) == benoetigte_personen, f"Bedarf_{tag}_{dienst.kuerzel}" # FAIRNESS-CONSTRAINTS UND ZIELFUNKTION objective_terms = [] # Berechne faire Zielverteilungen (global und lokal) ziel_dienste_global = self.berechne_faire_zielverteilung_global() ziel_dienste_lokal = self.berechne_faire_zielverteilung_lokal() # Hilfsvariablen für Fairness-Abweichungen fairness_abweichung_lokal = {} # F2 fairness_abweichung_global = {} # F1 for eltern in self.eltern: for dienst in self.dienste: fairness_abweichung_lokal[eltern, dienst] = pulp.LpVariable( f"fair_lokal_{eltern.replace(' ', '_')}_{dienst.kuerzel}", lowBound=0) fairness_abweichung_global[eltern, dienst] = pulp.LpVariable( f"fair_global_{eltern.replace(' ', '_')}_{dienst.kuerzel}", lowBound=0) # F1: Globale Fairness & F2: Lokale Fairness for eltern in self.eltern: for dienst in self.dienste: # Tatsächliche Dienste im aktuellen Monat tatsaechliche_dienste_monat = pulp.lpSum( x[eltern, tag, dienst] for tag in self.tage if (eltern, tag, dienst) in x ) # F2: Lokale Fairness - nur aktueller Monat ziel_lokal = ziel_dienste_lokal[eltern][dienst] if ziel_lokal > 0: # Lokale Fairness-Constraints prob += (tatsaechliche_dienste_monat - ziel_lokal <= fairness_abweichung_lokal[eltern, dienst]) prob += (ziel_lokal - tatsaechliche_dienste_monat <= fairness_abweichung_lokal[eltern, dienst]) # F1: Globale Fairness - basierend auf berechneter Zielverteilung ziel_global = ziel_dienste_global[eltern][dienst] vorherige_dienste = self.vorherige_dienste[eltern][dienst] if ziel_global > 0: # Tatsächliche Dienste global (Vergangenheit + geplant) total_dienste_inkl_vergangenheit = tatsaechliche_dienste_monat + vorherige_dienste # Globale Fairness-Constraints prob += (total_dienste_inkl_vergangenheit - ziel_global <= fairness_abweichung_global[eltern, dienst]) prob += (ziel_global - total_dienste_inkl_vergangenheit <= fairness_abweichung_global[eltern, dienst]) # Gewichtung: Jahresanfang F1 stärker, Jahresende F2 stärker # Annahme: September = Jahresanfang, Juli = Jahresende aktueller_monat = self.tage[0].month if self.tage else 1 if 9 <= aktueller_monat <= 12: # Sep-Dez: Jahresanfang gewicht_f1 = 100 # Global wichtiger gewicht_f2 = 50 # Lokal weniger wichtig elif 1 <= aktueller_monat <= 3: # Jan-Mar: Jahresmitte gewicht_f1 = 75 gewicht_f2 = 75 else: # Apr-Jul: Jahresende gewicht_f1 = 50 # Global weniger wichtig gewicht_f2 = 100 # Lokal wichtiger # Fairness-Terme zur Zielfunktion hinzufügen for eltern in self.eltern: for dienst in self.dienste: objective_terms.append(gewicht_f1 * fairness_abweichung_global[eltern, dienst]) objective_terms.append(gewicht_f2 * fairness_abweichung_lokal[eltern, dienst]) # P1: Bevorzugte Dienste (positiv belohnen) for (eltern, tag, dienst), präf in self.präferenzen.items(): if (eltern, tag, dienst) in x and präf == 1: # bevorzugt objective_terms.append(-5 * x[eltern, tag, dienst]) # Schwächer als Fairness # P2: Abgelehnte Dienste (bestrafen) for (eltern, tag, dienst), präf in self.präferenzen.items(): if (eltern, tag, dienst) in x and präf == -1: # abgelehnt objective_terms.append(25 * x[eltern, tag, dienst]) # Schwächer als Fairness # Zielfunktion setzen if objective_terms: prob += pulp.lpSum(objective_terms) else: # Fallback: Minimiere Gesamtanzahl Dienste prob += pulp.lpSum([var for var in x.values()]) print(f"Verwende Gewichtung: F1 (global) = {gewicht_f1}, F2 (lokal) = {gewicht_f2}") print(f"Modell erstellt mit {len(x)} Variablen und {len(prob.constraints)} Constraints") return prob, x def löse_optimierung(self, prob: pulp.LpProblem, x: Dict[Tuple[str, date, Dienst], pulp.LpVariable]) -> Optional[Dict[date, Dict[Dienst, List[str]]]]: """Löst das Optimierungsproblem""" print("Löse Optimierungsproblem...") # Solver wählen (verfügbare Solver testen) solver = None try: print("Versuche CBC Solver...") solver = pulp.PULP_CBC_CMD(msg=0, timeLimit=10) # Standard CBC Solver except: try: print("Versuche GLPK Solver...") solver = pulp.GLPK_CMD(msg=0) # GLPK falls verfügbar except: print("Kein spezifizierter Solver verfügbar, verwende Standard.") solver = None # Default Solver prob.solve(solver) status = pulp.LpStatus[prob.status] print(f"Optimierung abgeschlossen: {status}") if prob.status != pulp.LpStatusOptimal: print("WARNUNG: Keine optimale Lösung gefunden!") return None # Lösung extrahieren lösung: Dict[date, Dict[Dienst, List[str]]] = {} for (eltern, tag, dienst), var in x.items(): if var.varValue and var.varValue > 0.5: # Binary variable ist 1 if tag not in lösung: lösung[tag] = {} if dienst not in lösung[tag]: lösung[tag][dienst] = [] lösung[tag][dienst].append(eltern) return lösung def schreibe_ausgabe_csv(self, datei: str, lösung: Dict[date, Dict[Dienst, List[str]]]) -> None: """Schreibt die Lösung in die ausgabe.csv""" AusgabeWriter.schreibe_ausgabe_csv(datei, lösung, self.tage, self.dienste) def drucke_statistiken(self, lösung: Dict[date, Dict[Dienst, List[str]]]) -> None: """Druckt Statistiken zur Lösung""" print("\n" + "="*50) print("STATISTIKEN") print("="*50) # Dienste pro Eltern zählen dienste_pro_eltern = defaultdict(lambda: defaultdict(int)) for tag, tag_dienste in lösung.items(): for dienst, eltern_liste in tag_dienste.items(): for eltern in eltern_liste: dienste_pro_eltern[eltern][dienst] += 1 # Gesamtübersicht print("\nDienste pro Eltern:") for eltern in sorted(self.eltern): gesamt = sum(dienste_pro_eltern[eltern].values()) dienste_detail = ', '.join(f"{dienst.kuerzel}:{dienste_pro_eltern[eltern][dienst]}" for dienst in self.dienste if dienste_pro_eltern[eltern][dienst] > 0) print(f" {eltern:15} {gesamt:3d} ({dienste_detail})") # Dienstfaktor-Analyse print(f"\nDienstfaktoren im Planungszeitraum:") for eltern in sorted(self.eltern): faktor_summe = sum(self.dienstfaktoren.get(eltern, {}).get(tag, 0) for tag in self.tage) print(f" {eltern:15} {faktor_summe:.1f}") def main() -> None: if len(sys.argv) < 4: print("Usage: ./elterndienstplaner.py []") sys.exit(1) eingabe_datei = sys.argv[1] eltern_datei = sys.argv[2] ausgabe_datei = sys.argv[3] vorherige_datei = sys.argv[4] if len(sys.argv) > 4 else None print("Elterndienstplaner gestartet") print("="*50) try: planer = Elterndienstplaner() # Daten laden planer.lade_eingabe_csv(eingabe_datei) planer.lade_eltern_csv(eltern_datei) if vorherige_datei: planer.lade_vorherige_ausgaben_csv(vorherige_datei) # Optimierung prob, x = planer.erstelle_optimierungsmodell() lösung = planer.löse_optimierung(prob, x) if lösung is not None: # Ergebnisse ausgeben planer.schreibe_ausgabe_csv(ausgabe_datei, lösung) planer.drucke_statistiken(lösung) print("\n✓ Planung erfolgreich abgeschlossen!") else: print("\n✗ Fehler: Keine gültige Lösung gefunden!") sys.exit(1) except Exception as e: print(f"\n✗ Fehler: {e}") sys.exit(1) if __name__ == "__main__": main()