#!/usr/bin/env python3 """ Elterndienstplaner - Optimale Zuteilung von Elterndiensten Autor: Automatisch generiert Datum: Dezember 2025 """ import sys import pulp from datetime import timedelta, date from collections import defaultdict from typing import Dict, List, Tuple, DefaultDict, Optional from datenmodell import ElterndienstplanerDaten, Dienst, Eltern, Zielverteilung, Entscheidungsvariablen from ausgabe import ElterndienstAusgabe class Elterndienstplaner: """Optimierungs-Engine für Elterndienstplanung""" def __init__(self, daten: ElterndienstplanerDaten, ausgabe: ElterndienstAusgabe) -> None: self.daten = daten self.ausgabe = ausgabe def berechne_faire_zielverteilung_global(self) -> Zielverteilung: """Berechnet die faire Zielanzahl von Diensten für den Planungszeitraum basierend auf globaler Fairness (Historie + aktueller Planungszeitraum). Gibt die Ziel-Dienstanzahl für den aktuellen Planungszeitraum zurück, korrigiert um bereits geleistete Dienste. Kann negativ sein, wenn bereits mehr Dienste geleistet wurden als fair wäre.""" ziel_dienste: Zielverteilung = defaultdict(lambda: defaultdict(float)) print("\nBerechne faire Zielverteilung basierend auf historischen Daten...") # Historische Dienste nach Datum gruppieren historische_tage = set(datum for datum, _, _ in self.daten.historische_dienste) if self.daten.historische_dienste else set() print(f" Analysiere {len(historische_tage)} historische Tage mit {len(self.daten.historische_dienste)} Diensten") for dienst in self.daten.dienste: print(f" Verarbeite Dienst {dienst.kuerzel}...") # 1. HISTORISCHE PERIODE: Faire Umverteilung der tatsächlich geleisteten Dienste historische_dienste_dieses_typs = [ (datum, eltern) for datum, eltern, d in self.daten.historische_dienste if d == dienst ] print(f" Gefundene historische {dienst.kuerzel}-Dienste: {len(historische_dienste_dieses_typs)}") # Gruppiere nach Datum dienste_pro_tag = defaultdict(list) for datum, eltern in historische_dienste_dieses_typs: dienste_pro_tag[datum].append(eltern) # Für jeden historischen Tag faire Umverteilung berechnen for tag, geleistete_eltern in dienste_pro_tag.items(): anzahl_dienste = len(geleistete_eltern) # Anzahl Dienste an diesem Tag # Dienstfaktoren aller Eltern für diesen historischen Tag berechnen gesamt_dienstfaktor_tag = 0 for eltern in self.daten.eltern: gesamt_dienstfaktor_tag += self.daten.dienstfaktoren[eltern][tag] # Faire Umverteilung der an diesem Tag geleisteten Dienste if gesamt_dienstfaktor_tag > 0: for eltern in self.daten.eltern: if self.daten.dienstfaktoren[eltern][tag] > 0: anteil = self.daten.dienstfaktoren[eltern][tag] / gesamt_dienstfaktor_tag faire_zuteilung = anteil * anzahl_dienste ziel_dienste[eltern][dienst] += faire_zuteilung if faire_zuteilung > 0.01: # Debug nur für relevante Werte print(f" {tag}: {eltern} Faktor={self.daten.dienstfaktoren[eltern][tag]} " f"-> {faire_zuteilung:.2f} von {anzahl_dienste} Diensten") # 2. AKTUELLER PLANUNGSZEITRAUM: Faire Verteilung der benötigten Dienste (tageweise wie bei historischen Diensten) benoetigte_dienste_planungszeitraum = 0 # Für jeden Tag im aktuellen Planungszeitraum faire Umverteilung berechnen for tag in self.daten.planungszeitraum: # Prüfe ob an diesem Tag der Dienst benötigt wird if dienst not in self.daten.benoetigte_dienste.get(tag, []): continue benoetigte_dienste_planungszeitraum += dienst.personen_anzahl # Dienstfaktoren aller Eltern für diesen Tag berechnen dienstfaktoren = {} gesamt_dienstfaktor_tag = 0 for eltern in self.daten.eltern: faktor = self.daten.dienstfaktoren[eltern][tag] dienstfaktoren[eltern] = faktor gesamt_dienstfaktor_tag += faktor # Faire Umverteilung der an diesem Tag benötigten Dienste if gesamt_dienstfaktor_tag > 0: for eltern in self.daten.eltern: anteil = dienstfaktoren[eltern] / gesamt_dienstfaktor_tag faire_zuteilung = anteil * dienst.personen_anzahl ziel_dienste[eltern][dienst] += faire_zuteilung # 3. ABZUG DER BEREITS GELEISTETEN DIENSTE # Ziehe die tatsächlich geleisteten Dienste ab, um das Ziel für den Planungszeitraum zu erhalten for eltern in self.daten.eltern: # Berechne vorherige Dienste on-the-fly aus historischen Diensten vorherige_anzahl = sum( 1 for _, hist_eltern, hist_dienst in self.daten.historische_dienste if hist_eltern == eltern and hist_dienst == dienst ) ziel_dienste[eltern][dienst] -= vorherige_anzahl return ziel_dienste def berechne_faire_zielverteilung_lokal(self) -> Zielverteilung: """Berechnet die lokale faire Zielanzahl von Diensten pro Eltern-Dienst-Kombination basierend auf Dienstfaktoren und benötigten Diensten im aktuellen Planungszeitraum""" ziel_dienste_lokal: Zielverteilung = defaultdict(lambda: defaultdict(float)) print("\nBerechne lokale faire Zielverteilung für aktuellen Planungszeitraum...") # Gesamtdienstfaktor für aktuellen Planungszeitraum berechnen summe_dienstfaktor_planungszeitraum_alle_eltern = sum( sum(self.daten.dienstfaktoren[e][tag] for tag in self.daten.planungszeitraum) for e in self.daten.eltern ) if summe_dienstfaktor_planungszeitraum_alle_eltern == 0: print(" WARNUNG: Gesamtdienstfaktor ist 0, keine lokale Zielverteilung möglich") return ziel_dienste_lokal # Für jeden Dienst die lokale faire Verteilung berechnen for dienst in self.daten.dienste: # Anzahl benötigter Dienste im aktuellen Planungszeitraum benoetigte_dienste_planungszeitraum = sum( 1 for tag in self.daten.planungszeitraum if dienst in self.daten.benoetigte_dienste.get(tag, []) ) # Multipliziere mit Anzahl benötigter Personen pro Dienst benoetigte_dienste_planungszeitraum *= dienst.personen_anzahl if benoetigte_dienste_planungszeitraum > 0: print(f" {dienst.kuerzel}: {benoetigte_dienste_planungszeitraum} Dienste benötigt") for eltern in self.daten.eltern: # Dienstfaktor für diesen Elternteil im aktuellen Planungszeitraum summe_dienstfaktor_planungszeitraum = sum( self.daten.dienstfaktoren[eltern][tag] for tag in self.daten.planungszeitraum ) if summe_dienstfaktor_planungszeitraum > 0: anteil = summe_dienstfaktor_planungszeitraum / summe_dienstfaktor_planungszeitraum_alle_eltern faire_zuteilung = anteil * benoetigte_dienste_planungszeitraum ziel_dienste_lokal[eltern][dienst] = faire_zuteilung return ziel_dienste_lokal def _erstelle_entscheidungsvariablen(self) -> Entscheidungsvariablen: """Erstellt die binären Entscheidungsvariablen x[eltern, tag, dienst]""" x: Entscheidungsvariablen = {} for eltern in self.daten.eltern: for tag in self.daten.planungszeitraum: for dienst in self.daten.dienste: if dienst in self.daten.benoetigte_dienste.get(tag, []): x[eltern, tag, dienst] = pulp.LpVariable( f"x_{eltern.replace(' ', '_')}_{tag}_{dienst.kuerzel}", cat='Binary' ) return x def _add_constraint_ein_dienst_pro_woche( self, prob: pulp.LpProblem, x: Entscheidungsvariablen ) -> None: """C1: Je Eltern und Dienst nur einmal die Woche (Woche = Montag bis Sonntag)""" erster_tag = self.daten.planungszeitraum[0] # weekday(): 0=Montag, 6=Sonntag # Finde Montag am oder vor dem ersten Planungstag (für historische Dienste) woche_start = erster_tag - timedelta(days=erster_tag.weekday()) woche_nr = 0 letzter_tag = self.daten.planungszeitraum[-1] while woche_start <= letzter_tag: woche_ende = woche_start + timedelta(days=6) # Sonntag for eltern in self.daten.eltern: for dienst in self.daten.dienste: woche_vars = [] # Zähle historische Dienste in dieser Woche (VOR dem Planungszeitraum) historische_dienste_in_woche = 0 if woche_start < erster_tag: for hist_datum, hist_eltern, hist_dienst in self.daten.historische_dienste: if (hist_eltern == eltern and hist_dienst == dienst and woche_start <= hist_datum < erster_tag): historische_dienste_in_woche += 1 # Sammle Variablen für Planungszeitraum in dieser Woche for tag in self.daten.planungszeitraum: if woche_start <= tag <= woche_ende: if (eltern, tag, dienst) in x: woche_vars.append(x[eltern, tag, dienst]) # Constraint: Historische + geplante Dienste <= 1 if woche_vars: prob += pulp.lpSum(woche_vars) <= 1 - historische_dienste_in_woche, \ f"C1_{eltern.replace(' ', '_')}_{dienst.kuerzel}_w{woche_nr}" woche_start += timedelta(days=7) woche_nr += 1 def _add_constraint_ein_dienst_pro_tag( self, prob: pulp.LpProblem, x: Entscheidungsvariablen ) -> None: """C2: Je Eltern nur einen Dienst am Tag""" for eltern in self.daten.eltern: for tag in self.daten.planungszeitraum: tag_vars = [] for dienst in self.daten.dienste: if (eltern, tag, dienst) in x: tag_vars.append(x[eltern, tag, dienst]) if tag_vars: prob += pulp.lpSum(tag_vars) <= 1, f"C2_{eltern.replace(' ', '_')}_{tag}" def _add_constraint_verfuegbarkeit( self, prob: pulp.LpProblem, x: Entscheidungsvariablen ) -> None: """C3: Dienste nur verfügbaren Eltern zuteilen""" for eltern in self.daten.eltern: for tag in self.daten.planungszeitraum: if not self.daten.verfügbarkeit.get((eltern, tag), True): for dienst in self.daten.dienste: if (eltern, tag, dienst) in x: prob += x[eltern, tag, dienst] == 0, \ f"C3_{eltern.replace(' ', '_')}_{tag}_{dienst.kuerzel}" def _add_constraint_dienst_bedarf( self, prob: pulp.LpProblem, x: Entscheidungsvariablen ) -> None: """C4: Alle benötigten Dienste müssen zugeteilt werden""" for tag in self.daten.planungszeitraum: for dienst in self.daten.benoetigte_dienste.get(tag, []): dienst_vars = [] for eltern in self.daten.eltern: if (eltern, tag, dienst) in x: # Prüfe ob Eltern verfügbar if self.daten.verfügbarkeit.get((eltern, tag), True): dienst_vars.append(x[eltern, tag, dienst]) if dienst_vars: # Anzahl benötigter Personen pro Dienst (aus Dienst-Objekt) benoetigte_personen = dienst.personen_anzahl prob += pulp.lpSum(dienst_vars) == benoetigte_personen, \ f"Bedarf_{tag}_{dienst.kuerzel}" def _add_fairness_constraints( self, prob: pulp.LpProblem, x: Entscheidungsvariablen, ziel_dienste: Zielverteilung, constraint_prefix: str ) -> Dict: """Erstellt Fairness-Variablen und fügt Fairness-Constraints hinzu Args: prob: Das LP-Problem x: Die Entscheidungsvariablen ziel_dienste: Die Zielverteilung der Dienste constraint_prefix: Präfix für Constraint-Namen ('lokal' oder 'global') Returns: Dictionary mit Fairness-Abweichungsvariablen """ # Hilfsvariablen für Fairness-Abweichungen erstellen fairness_abweichung = {} for eltern in self.daten.eltern: for dienst in self.daten.dienste: fairness_abweichung[eltern, dienst] = pulp.LpVariable( f"fair_{constraint_prefix}_{eltern.replace(' ', '_')}_{dienst.kuerzel}", lowBound=0) # Fairness-Constraints hinzufügen for eltern in self.daten.eltern: for dienst in self.daten.dienste: # Tatsächliche Dienste im aktuellen Planungszeitraum zugeteilte_dienste_planungszeitraum = pulp.lpSum( x[eltern, tag, dienst] for tag in self.daten.planungszeitraum if (eltern, tag, dienst) in x ) # Ziel für diese Fairness-Variante ziel = ziel_dienste[eltern][dienst] prob += (zugeteilte_dienste_planungszeitraum - ziel <= fairness_abweichung[eltern, dienst]) prob += (ziel - zugeteilte_dienste_planungszeitraum <= fairness_abweichung[eltern, dienst]) return fairness_abweichung def _add_constraint_gesamtfairness( self, prob: pulp.LpProblem, x: Entscheidungsvariablen, ziel_dienste: Zielverteilung, constraint_prefix: str ) -> Dict: """F3: Dienstübergreifende Fairness - verhindert Häufung bei einzelnen Eltern Berechnet die Abweichung der Gesamtdienstanzahl (über alle Diensttypen) vom fairen Gesamtziel. Dies verhindert, dass einzelne Eltern über alle Diensttypen hinweg überproportional viele Dienste bekommen. Args: prob: Das LP-Problem x: Die Entscheidungsvariablen ziel_dienste: Die Zielverteilung (global oder lokal) constraint_prefix: Präfix für Constraint-Namen ('lokal' oder 'global') Returns: Dictionary mit Gesamt-Fairness-Abweichungsvariablen """ fairness_abweichung_gesamt = {} for eltern in self.daten.eltern: fairness_abweichung_gesamt[eltern] = pulp.LpVariable( f"fair_gesamt_{constraint_prefix}_{eltern.replace(' ', '_')}", lowBound=0) # Tatsächliche Gesamtdienste für diesen Elternteil tatsaechliche_dienste_gesamt = pulp.lpSum( x[eltern, tag, dienst] for tag in self.daten.planungszeitraum for dienst in self.daten.dienste if (eltern, tag, dienst) in x ) # Ziel-Gesamtdienste für diesen Elternteil (Summe über alle Dienste) ziel_gesamt = sum(ziel_dienste[eltern][dienst] for dienst in self.daten.dienste) # Fairness-Constraints prob += (tatsaechliche_dienste_gesamt - ziel_gesamt <= fairness_abweichung_gesamt[eltern]) prob += (ziel_gesamt - tatsaechliche_dienste_gesamt <= fairness_abweichung_gesamt[eltern]) return fairness_abweichung_gesamt def _erstelle_zielfunktion( self, prob: pulp.LpProblem, x: Entscheidungsvariablen, fairness_abweichung_lokal: Dict, fairness_abweichung_global: Dict, fairness_abweichung_gesamt_global: Dict, fairness_abweichung_gesamt_lokal: Dict ) -> None: """Erstellt die Zielfunktion mit Fairness und Präferenzen""" objective_terms = [] # Fairness-Gewichtung gewicht_global = 40 gewicht_lokal = 60 gewicht_f1 = gewicht_global gewicht_f2 = gewicht_lokal gewicht_f3_global = 0.25 * gewicht_global gewicht_f3_lokal = 0.25 * gewicht_lokal # Fairness-Terme zur Zielfunktion hinzufügen for eltern in self.daten.eltern: for dienst in self.daten.dienste: objective_terms.append(gewicht_f1 * fairness_abweichung_global[eltern, dienst]) objective_terms.append(gewicht_f2 * fairness_abweichung_lokal[eltern, dienst]) # F3: Gesamtfairness (dienstübergreifend) - global und lokal objective_terms.append(gewicht_f3_global * fairness_abweichung_gesamt_global[eltern]) objective_terms.append(gewicht_f3_lokal * fairness_abweichung_gesamt_lokal[eltern]) # P1: Bevorzugte Dienste (positiv belohnen) for (eltern, tag, dienst), präf in self.daten.präferenzen.items(): if (eltern, tag, dienst) in x and präf == 1: # bevorzugt objective_terms.append(-5 * x[eltern, tag, dienst]) # P2: Abgelehnte Dienste (bestrafen) for (eltern, tag, dienst), präf in self.daten.präferenzen.items(): if (eltern, tag, dienst) in x and präf == -1: # abgelehnt objective_terms.append(25 * x[eltern, tag, dienst]) # Zielfunktion setzen if objective_terms: prob += pulp.lpSum(objective_terms) else: # Fallback: Minimiere Gesamtanzahl Dienste prob += pulp.lpSum([var for var in x.values()]) print(f"Verwende Gewichtung: F1 (global) = {gewicht_f1}, F2 (lokal) = {gewicht_f2}, " f"F3_global = {gewicht_f3_global}, F3_lokal = {gewicht_f3_lokal}") def erstelle_optimierungsmodell(self) -> Tuple[ pulp.LpProblem, Entscheidungsvariablen ]: """Erstellt das PuLP Optimierungsmodell Returns: Tuple mit (prob, x, ziel_dienste_lokal, ziel_dienste_global) """ print("Erstelle Optimierungsmodell...") # Debugging: Verfügbarkeit prüfen print("\nDebug: Verfügbarkeit analysieren...") for tag in self.daten.planungszeitraum[:5]: # Erste 5 Tage verfügbare = [e for e in self.daten.eltern if self.daten.verfügbarkeit.get((e, tag), True)] benötigte = self.daten.benoetigte_dienste.get(tag, []) print(f" {tag}: Benötigt {len(benötigte)} Dienste {benötigte}, verfügbar: {verfügbare}") # LP Problem erstellen prob = pulp.LpProblem("Elterndienstplaner", pulp.LpMinimize) # Entscheidungsvariablen erstellen x = self._erstelle_entscheidungsvariablen() # Grundlegende Constraints hinzufügen self._add_constraint_ein_dienst_pro_woche(prob, x) self._add_constraint_ein_dienst_pro_tag(prob, x) self._add_constraint_verfuegbarkeit(prob, x) self._add_constraint_dienst_bedarf(prob, x) # Fairness-Constraints ziel_dienste_global = self.berechne_faire_zielverteilung_global() ziel_dienste_lokal = self.berechne_faire_zielverteilung_lokal() # Observer Pattern: Notify ausgabe about target distributions self.ausgabe.setze_zielverteilungen(ziel_dienste_lokal, ziel_dienste_global) # F2: Lokale Fairness-Constraints fairness_abweichung_lokal = self._add_fairness_constraints( prob, x, ziel_dienste_lokal, "lokal" ) # F1: Globale Fairness-Constraints fairness_abweichung_global = self._add_fairness_constraints( prob, x, ziel_dienste_global, "global" ) # F3: Dienstübergreifende Fairness - Global fairness_abweichung_gesamt_global = self._add_constraint_gesamtfairness( prob, x, ziel_dienste_global, "global" ) # F3: Dienstübergreifende Fairness - Lokal fairness_abweichung_gesamt_lokal = self._add_constraint_gesamtfairness( prob, x, ziel_dienste_lokal, "lokal" ) # Zielfunktion erstellen self._erstelle_zielfunktion(prob, x, fairness_abweichung_lokal, fairness_abweichung_global, fairness_abweichung_gesamt_global, fairness_abweichung_gesamt_lokal) print(f"Modell erstellt mit {len(x)} Variablen und {len(prob.constraints)} Constraints") return prob, x def löse_optimierung(self, prob: pulp.LpProblem, x: Entscheidungsvariablen) -> Optional[Dict[date, Dict[Dienst, List[Eltern]]]]: """Löst das Optimierungsproblem""" print("Löse Optimierungsproblem...") # Solver wählen (verfügbare Solver testen) solver = None try: print("Versuche CBC Solver...") solver = pulp.PULP_CBC_CMD(msg=0, timeLimit=10) # Standard CBC Solver except: try: print("Versuche GLPK Solver...") solver = pulp.GLPK_CMD(msg=0) # GLPK falls verfügbar except: print("Kein spezifizierter Solver verfügbar, verwende Standard.") solver = None # Default Solver prob.solve(solver) status = pulp.LpStatus[prob.status] print(f"Optimierung abgeschlossen: {status}") if prob.status != pulp.LpStatusOptimal: print("WARNUNG: Keine optimale Lösung gefunden!") return None # Lösung extrahieren lösung: Dict[date, Dict[Dienst, List[Eltern]]] = {} for (eltern, tag, dienst), var in x.items(): if var.varValue and var.varValue > 0.5: # Binary variable ist 1 if tag not in lösung: lösung[tag] = {} if dienst not in lösung[tag]: lösung[tag][dienst] = [] lösung[tag][dienst].append(eltern) return lösung def main() -> None: if len(sys.argv) < 4: print("Usage: ./elterndienstplaner.py []") sys.exit(1) eingabe_datei = sys.argv[1] eltern_datei = sys.argv[2] ausgabe_datei = sys.argv[3] vorherige_datei = sys.argv[4] if len(sys.argv) > 4 else None print("Elterndienstplaner gestartet") print("="*50) try: # Create data model and load data daten = ElterndienstplanerDaten() daten.lade_daten(eingabe_datei, eltern_datei, vorherige_datei) # Create output handler and optimization engine ausgabe = ElterndienstAusgabe(daten) planer = Elterndienstplaner(daten, ausgabe) # Optimierung prob, x = planer.erstelle_optimierungsmodell() lösung = planer.löse_optimierung(prob, x) if lösung is not None: # Ergebnisse ausgeben ausgabe.schreibe_ausgabe_csv(ausgabe_datei, lösung) ausgabe.drucke_statistiken(lösung) # Visualisierung der Verteilungen (uses Observer Pattern targets) ausgabe.visualisiere_verteilungen(lösung) # Visualisierung der Präferenz-Verletzungen ausgabe.visualisiere_praeferenz_verletzungen(lösung) print("\n✓ Planung erfolgreich abgeschlossen!") else: print("\n✗ Fehler: Keine gültige Lösung gefunden!") sys.exit(1) except Exception as e: print(f"\n✗ Fehler: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()