#!/usr/bin/env python3 """ Elterndienstplaner - Optimale Zuteilung von Elterndiensten Autor: Automatisch generiert Datum: Dezember 2025 """ import sys import pulp from datetime import timedelta, date from collections import defaultdict from typing import Dict, List, Tuple, DefaultDict, Optional from datenmodell import ElterndienstplanerDaten, Dienst, Eltern, Zielverteilung, Entscheidungsvariablen from ausgabe import ElterndienstAusgabe class Elterndienstplaner: """Optimierungs-Engine fuer Elterndienstplanung""" def __init__(self, daten: ElterndienstplanerDaten, ausgabe: ElterndienstAusgabe) -> None: self.daten = daten self.ausgabe = ausgabe def berechne_faire_zielverteilung_global(self) -> Zielverteilung: """Berechnet die faire Zielanzahl von Diensten fuer den Planungszeitraum basierend auf globaler Fairness (Historie + aktueller Planungszeitraum). Gibt die Ziel-Dienstanzahl fuer den aktuellen Planungszeitraum zurueck, korrigiert um bereits geleistete Dienste. Kann negativ sein, wenn bereits mehr Dienste geleistet wurden als fair waere.""" ziel_dienste: Zielverteilung = defaultdict(lambda: defaultdict(float)) print("\nBerechne faire Zielverteilung basierend auf historischen Daten...") historische_tage = set(datum for datum, _, _ in self.daten.historische_dienste) if self.daten.historische_dienste else set() print(f" Analysiere {len(historische_tage)} historische Tage mit {len(self.daten.historische_dienste)} Diensten") for dienst in self.daten.dienste: print(f" Verarbeite Dienst {dienst.kuerzel}...") # 1. HISTORISCHE PERIODE: Faire Umverteilung historische_dienste_dieses_typs = [ (datum, eltern) for datum, eltern, d in self.daten.historische_dienste if d == dienst ] print(f" Gefundene historische {dienst.kuerzel}-Dienste: {len(historische_dienste_dieses_typs)}") dienste_pro_tag = defaultdict(list) for datum, eltern in historische_dienste_dieses_typs: dienste_pro_tag[datum].append(eltern) for tag, geleistete_eltern in dienste_pro_tag.items(): anzahl_dienste = len(geleistete_eltern) gesamt_dienstfaktor_tag = 0 for eltern in self.daten.eltern: gesamt_dienstfaktor_tag += self.daten.dienstfaktoren[eltern][tag] if gesamt_dienstfaktor_tag > 0: for eltern in self.daten.eltern: if self.daten.dienstfaktoren[eltern][tag] > 0: anteil = self.daten.dienstfaktoren[eltern][tag] / gesamt_dienstfaktor_tag faire_zuteilung = anteil * anzahl_dienste ziel_dienste[eltern][dienst] += faire_zuteilung if faire_zuteilung > 0.01: print(f" {tag}: {eltern} Faktor={self.daten.dienstfaktoren[eltern][tag]} " f"-> {faire_zuteilung:.2f} von {anzahl_dienste} Diensten") # 2. AKTUELLER PLANUNGSZEITRAUM: Faire Verteilung benoetigte_dienste_planungszeitraum = 0 for tag in self.daten.planungszeitraum: if dienst not in self.daten.benoetigte_dienste.get(tag, []): continue benoetigte_dienste_planungszeitraum += dienst.personen_anzahl dienstfaktoren = {} gesamt_dienstfaktor_tag = 0 for eltern in self.daten.eltern: faktor = self.daten.dienstfaktoren[eltern][tag] dienstfaktoren[eltern] = faktor gesamt_dienstfaktor_tag += faktor if gesamt_dienstfaktor_tag > 0: for eltern in self.daten.eltern: anteil = dienstfaktoren[eltern] / gesamt_dienstfaktor_tag faire_zuteilung = anteil * dienst.personen_anzahl ziel_dienste[eltern][dienst] += faire_zuteilung # 3. ABZUG DER BEREITS GELEISTETEN DIENSTE for eltern in self.daten.eltern: vorherige_anzahl = sum( 1 for _, hist_eltern, hist_dienst in self.daten.historische_dienste if hist_eltern == eltern and hist_dienst == dienst ) ziel_dienste[eltern][dienst] -= vorherige_anzahl return ziel_dienste def berechne_faire_zielverteilung_lokal(self) -> Zielverteilung: """Berechnet die lokale faire Zielanzahl von Diensten pro Eltern-Dienst-Kombination basierend auf Dienstfaktoren und benoetigten Diensten im aktuellen Planungszeitraum WICHTIG: Bei der lokalen Fairness werden Abwesenheitstage NICHT in die Dienstpflicht eingerechnet (Dienstfaktor = 0 an Abwesenheitstagen). Das führt zu einer gleichmäßigeren Verteilung im aktuellen Monat und verhindert, dass Familien mit längeren Abwesenheiten in den wenigen verfügbaren Tagen überproportional viele Dienste bekommen. Die "verpassten" Dienste werden dann über die globale Fairness (F1) im Jahresverlauf ausgeglichen.""" ziel_dienste_lokal: Zielverteilung = defaultdict(lambda: defaultdict(float)) print("\nBerechne lokale faire Zielverteilung für aktuellen Planungszeitraum...") print(" (Abwesenheitstage werden aus der Dienstpflicht herausgerechnet)") summe_dienstfaktor_planungszeitraum_alle_eltern = sum( sum(self.daten.dienstfaktoren[e][tag] for tag in self.daten.planungszeitraum) for e in self.daten.eltern ) if summe_dienstfaktor_planungszeitraum_alle_eltern == 0: print(" WARNUNG: Gesamtdienstfaktor ist 0, keine lokale Zielverteilung möglich") return ziel_dienste_lokal for dienst in self.daten.dienste: benoetigte_dienste_planungszeitraum = sum( 1 for tag in self.daten.planungszeitraum if dienst in self.daten.benoetigte_dienste.get(tag, []) ) benoetigte_dienste_planungszeitraum *= dienst.personen_anzahl if benoetigte_dienste_planungszeitraum > 0: print(f" {dienst.kuerzel}: {benoetigte_dienste_planungszeitraum} Dienste benötigt") for eltern in self.daten.eltern: summe_dienstfaktor_planungszeitraum = sum( self.daten.dienstfaktoren[eltern][tag] for tag in self.daten.planungszeitraum ) if summe_dienstfaktor_planungszeitraum > 0: anteil = summe_dienstfaktor_planungszeitraum / summe_dienstfaktor_planungszeitraum_alle_eltern faire_zuteilung = anteil * benoetigte_dienste_planungszeitraum ziel_dienste_lokal[eltern][dienst] = faire_zuteilung return ziel_dienste_lokal def _erstelle_entscheidungsvariablen(self) -> Entscheidungsvariablen: """Erstellt die binaeren Entscheidungsvariablen x[eltern, tag, dienst]""" x: Entscheidungsvariablen = {} for eltern in self.daten.eltern: for tag in self.daten.planungszeitraum: for dienst in self.daten.dienste: if dienst in self.daten.benoetigte_dienste.get(tag, []): x[eltern, tag, dienst] = pulp.LpVariable( f"x_{eltern.replace(' ', '_')}_{tag}_{dienst.kuerzel}", cat='Binary' ) return x def _add_constraint_ein_dienst_pro_woche( self, prob: pulp.LpProblem, x: Entscheidungsvariablen ) -> None: """C1: Je Eltern und Dienst nur einmal die Woche (Woche = Montag bis Sonntag)""" erster_tag = self.daten.planungszeitraum[0] # Finde Montag am oder vor dem ersten Planungstag woche_start = erster_tag - timedelta(days=erster_tag.weekday()) woche_nr = 0 letzter_tag = self.daten.planungszeitraum[-1] while woche_start <= letzter_tag: woche_ende = woche_start + timedelta(days=6) for eltern in self.daten.eltern: for dienst in self.daten.dienste: woche_vars = [] # Zaehle historische Dienste in dieser Woche (VOR Planungszeitraum) historische_dienste_in_woche = 0 if woche_start < erster_tag: for hist_datum, hist_eltern, hist_dienst in self.daten.historische_dienste: if (hist_eltern == eltern and hist_dienst == dienst and woche_start <= hist_datum < erster_tag): historische_dienste_in_woche += 1 for tag in self.daten.planungszeitraum: if woche_start <= tag <= woche_ende: if (eltern, tag, dienst) in x: woche_vars.append(x[eltern, tag, dienst]) if woche_vars: prob += pulp.lpSum(woche_vars) <= 1 - historische_dienste_in_woche, \ f"C1_{eltern.replace(' ', '_')}_{dienst.kuerzel}_w{woche_nr}" woche_start += timedelta(days=7) woche_nr += 1 def _add_constraint_ein_dienst_pro_tag( self, prob: pulp.LpProblem, x: Entscheidungsvariablen ) -> None: """C2: Je Eltern nur einen Dienst am Tag""" for eltern in self.daten.eltern: for tag in self.daten.planungszeitraum: tag_vars = [] for dienst in self.daten.dienste: if (eltern, tag, dienst) in x: tag_vars.append(x[eltern, tag, dienst]) if tag_vars: prob += pulp.lpSum(tag_vars) <= 1, f"C2_{eltern.replace(' ', '_')}_{tag}" def _add_constraint_verfuegbarkeit( self, prob: pulp.LpProblem, x: Entscheidungsvariablen ) -> None: """C3: Dienste nur verfuegbaren Eltern zuteilen""" for eltern in self.daten.eltern: for tag in self.daten.planungszeitraum: if not self.daten.verfuegbarkeit.get((eltern, tag), True): for dienst in self.daten.dienste: if (eltern, tag, dienst) in x: prob += x[eltern, tag, dienst] == 0, \ f"C3_{eltern.replace(' ', '_')}_{tag}_{dienst.kuerzel}" def _add_constraint_dienst_bedarf( self, prob: pulp.LpProblem, x: Entscheidungsvariablen ) -> None: """C4: Alle benoetigten Dienste muessen zugeteilt werden""" for tag in self.daten.planungszeitraum: for dienst in self.daten.benoetigte_dienste.get(tag, []): dienst_vars = [] for eltern in self.daten.eltern: if (eltern, tag, dienst) in x: if self.daten.verfuegbarkeit.get((eltern, tag), True): dienst_vars.append(x[eltern, tag, dienst]) if dienst_vars: # Anzahl benoetigter Personen pro Dienst benoetigte_personen = dienst.personen_anzahl prob += pulp.lpSum(dienst_vars) == benoetigte_personen, \ f"Bedarf_{tag}_{dienst.kuerzel}" def _add_constraint_fairness_diensttypspezifisch( self, prob: pulp.LpProblem, x: Entscheidungsvariablen, ziel_dienste: Zielverteilung, constraint_prefix: str ) -> Dict: """F1/F2: Fairness pro Diensttyp - gleicht Anzahl je Diensttyp aus Berechnet die Abweichung der zugeteilten Dienste vom fairen Ziel fuer jeden Diensttyp separat. Dies sorgt dafuer, dass z.B. Kochdienste und Putzdienste jeweils fair verteilt werden. F1 (global): Basierend auf historischen Daten + aktuellem Planungszeitraum F2 (lokal): Nur fuer den aktuellen Planungszeitraum Args: prob: Das LP-Problem x: Die Entscheidungsvariablen ziel_dienste: Die Zielverteilung (global oder lokal) constraint_prefix: Praefix fuer Constraint-Namen ('global' fuer F1, 'lokal' fuer F2) Returns: Dictionary mit Fairness-Abweichungsvariablen pro Diensttyp """ fairness_abweichung = {} for eltern in self.daten.eltern: for dienst in self.daten.dienste: fairness_abweichung[eltern, dienst] = pulp.LpVariable( f"fair_{constraint_prefix}_{eltern.replace(' ', '_')}_{dienst.kuerzel}", lowBound=0) for eltern in self.daten.eltern: for dienst in self.daten.dienste: zugeteilte_dienste_planungszeitraum = pulp.lpSum( x[eltern, tag, dienst] for tag in self.daten.planungszeitraum if (eltern, tag, dienst) in x ) ziel = ziel_dienste[eltern][dienst] prob += (zugeteilte_dienste_planungszeitraum - ziel <= fairness_abweichung[eltern, dienst]) prob += (ziel - zugeteilte_dienste_planungszeitraum <= fairness_abweichung[eltern, dienst]) return fairness_abweichung def _add_constraint_fairness_typuebergreifend( self, prob: pulp.LpProblem, x: Entscheidungsvariablen, ziel_dienste: Zielverteilung, constraint_prefix: str ) -> Dict: """F3/F4: Diensttypuebergreifende Fairness - verhindert Haeufung bei einzelnen Familien Berechnet die Abweichung der Gesamtdienstanzahl (ueber alle Diensttypen) vom fairen Gesamtziel. Dies verhindert, dass einzelne Familien ueber alle Diensttypen hinweg ueberproportional viele Dienste bekommen. F3 (global): Basierend auf historischen Daten + aktuellem Planungszeitraum F4 (lokal): Nur fuer den aktuellen Planungszeitraum Args: prob: Das LP-Problem x: Die Entscheidungsvariablen ziel_dienste: Die Zielverteilung (global oder lokal) constraint_prefix: Praefix fuer Constraint-Namen ('global' fuer F3, 'lokal' fuer F4) Returns: Dictionary mit Gesamt-Fairness-Abweichungsvariablen ueber alle Diensttypen """ fairness_abweichung_gesamt = {} for eltern in self.daten.eltern: fairness_abweichung_gesamt[eltern] = pulp.LpVariable( f"fair_gesamt_{constraint_prefix}_{eltern.replace(' ', '_')}", lowBound=0) tatsaechliche_dienste_gesamt = pulp.lpSum( x[eltern, tag, dienst] for tag in self.daten.planungszeitraum for dienst in self.daten.dienste if (eltern, tag, dienst) in x ) ziel_gesamt = sum(ziel_dienste[eltern][dienst] for dienst in self.daten.dienste) prob += (tatsaechliche_dienste_gesamt - ziel_gesamt <= fairness_abweichung_gesamt[eltern]) prob += (ziel_gesamt - tatsaechliche_dienste_gesamt <= fairness_abweichung_gesamt[eltern]) return fairness_abweichung_gesamt def _erstelle_zielfunktion( self, prob: pulp.LpProblem, x: Entscheidungsvariablen, fairness_abweichung_lokal: Dict, fairness_abweichung_global: Dict, fairness_abweichung_gesamt_global: Dict, fairness_abweichung_gesamt_lokal: Dict ) -> None: """Erstellt die Zielfunktion mit Fairness und Praeferenzen""" objective_terms = [] gewicht_global = 40 gewicht_lokal = 60 gewicht_f1 = gewicht_global gewicht_f2 = gewicht_lokal gewicht_f3_global = 0.25 * gewicht_global gewicht_f4_lokal = 0.25 * gewicht_lokal for eltern in self.daten.eltern: for dienst in self.daten.dienste: objective_terms.append(gewicht_f1 * fairness_abweichung_global[eltern, dienst]) objective_terms.append(gewicht_f2 * fairness_abweichung_lokal[eltern, dienst]) objective_terms.append(gewicht_f3_global * fairness_abweichung_gesamt_global[eltern]) objective_terms.append(gewicht_f4_lokal * fairness_abweichung_gesamt_lokal[eltern]) # P1: Bevorzugte Dienste for (eltern, tag, dienst), praef in self.daten.praeferenzen.items(): if (eltern, tag, dienst) in x and praef == 1: objective_terms.append(-5 * x[eltern, tag, dienst]) # P2: Abgelehnte Dienste for (eltern, tag, dienst), praef in self.daten.praeferenzen.items(): if (eltern, tag, dienst) in x and praef == -1: objective_terms.append(25 * x[eltern, tag, dienst]) if objective_terms: prob += pulp.lpSum(objective_terms) else: prob += pulp.lpSum([var for var in x.values()]) print(f"Verwende Gewichtung: F1 (global) = {gewicht_f1}, F2 (lokal) = {gewicht_f2}, " f"F3 (global) = {gewicht_f3_global}, F4 (lokal) = {gewicht_f4_lokal}") def erstelle_optimierungsmodell(self) -> Tuple[ pulp.LpProblem, Entscheidungsvariablen ]: """Erstellt das PuLP Optimierungsmodell Returns: Tuple mit (prob, x, ziel_dienste_lokal, ziel_dienste_global) """ print("Erstelle Optimierungsmodell...") print("\nDebug: Verfuegbarkeit analysieren...") for tag in self.daten.planungszeitraum[:5]: verfuegbare = [e for e in self.daten.eltern if self.daten.verfuegbarkeit.get((e, tag), True)] benoetigte = self.daten.benoetigte_dienste.get(tag, []) print(f" {tag}: Benötigt {len(benoetigte)} Dienste {benoetigte}, verfügbar: {verfuegbare}") prob = pulp.LpProblem("Elterndienstplaner", pulp.LpMinimize) x = self._erstelle_entscheidungsvariablen() self._add_constraint_ein_dienst_pro_woche(prob, x) self._add_constraint_ein_dienst_pro_tag(prob, x) self._add_constraint_verfuegbarkeit(prob, x) self._add_constraint_dienst_bedarf(prob, x) ziel_dienste_global = self.berechne_faire_zielverteilung_global() ziel_dienste_lokal = self.berechne_faire_zielverteilung_lokal() self.ausgabe.setze_zielverteilungen(ziel_dienste_lokal, ziel_dienste_global) # F2: Lokale Fairness pro Diensttyp fairness_abweichung_lokal = self._add_constraint_fairness_diensttypspezifisch( prob, x, ziel_dienste_lokal, "lokal" ) # F1: Globale Fairness pro Diensttyp fairness_abweichung_global = self._add_constraint_fairness_diensttypspezifisch( prob, x, ziel_dienste_global, "global" ) # F3: Diensttypuebergreifende Fairness (global) fairness_abweichung_gesamt_global = self._add_constraint_fairness_typuebergreifend( prob, x, ziel_dienste_global, "global" ) # F4: Diensttypuebergreifende Fairness (lokal) fairness_abweichung_gesamt_lokal = self._add_constraint_fairness_typuebergreifend( prob, x, ziel_dienste_lokal, "lokal" ) self._erstelle_zielfunktion(prob, x, fairness_abweichung_lokal, fairness_abweichung_global, fairness_abweichung_gesamt_global, fairness_abweichung_gesamt_lokal) print(f"Modell erstellt mit {len(x)} Variablen und {len(prob.constraints)} Constraints") return prob, x def loese_optimierung(self, prob: pulp.LpProblem, x: Entscheidungsvariablen) -> Optional[Dict[date, Dict[Dienst, List[Eltern]]]]: """Loest das Optimierungsproblem""" print("Löse Optimierungsproblem...") solver = None try: print("Versuche CBC Solver...") solver = pulp.PULP_CBC_CMD(msg=0, timeLimit=10) except: try: print("Versuche GLPK Solver...") solver = pulp.GLPK_CMD(msg=0) except: print("Kein spezifizierter Solver verfügbar, verwende Standard.") solver = None prob.solve(solver) status = pulp.LpStatus[prob.status] print(f"Optimierung abgeschlossen: {status}") if prob.status != pulp.LpStatusOptimal: print("WARNUNG: Keine optimale Lösung gefunden!") return None loesung: Dict[date, Dict[Dienst, List[Eltern]]] = {} for (eltern, tag, dienst), var in x.items(): if var.varValue and var.varValue > 0.5: if tag not in loesung: loesung[tag] = {} if dienst not in loesung[tag]: loesung[tag][dienst] = [] loesung[tag][dienst].append(eltern) return loesung def main() -> None: if len(sys.argv) < 4: print("Usage: ./elterndienstplaner.py []") sys.exit(1) eingabe_datei = sys.argv[1] eltern_datei = sys.argv[2] ausgabe_datei = sys.argv[3] vorherige_datei = sys.argv[4] if len(sys.argv) > 4 else None print("Elterndienstplaner gestartet") print("="*50) try: daten = ElterndienstplanerDaten() daten.lade_daten(eingabe_datei, eltern_datei, vorherige_datei) ausgabe = ElterndienstAusgabe(daten) planer = Elterndienstplaner(daten, ausgabe) prob, x = planer.erstelle_optimierungsmodell() loesung = planer.loese_optimierung(prob, x) if loesung is not None: ausgabe.schreibe_ausgabe_csv(ausgabe_datei, loesung) ausgabe.drucke_statistiken(loesung) ausgabe.visualisiere_verteilungen(loesung) ausgabe.visualisiere_praeferenz_verletzungen(loesung) print("\n✓ Planung erfolgreich abgeschlossen!") else: print("\n✗ Fehler: Keine gültige Lösung gefunden!") sys.exit(1) except Exception as e: print(f"\n✗ Fehler: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()