#!/usr/bin/env python3 """ Elterndienstplaner - Optimale Zuteilung von Elterndiensten Autor: Automatisch generiert Datum: Dezember 2025 """ import sys import csv import pulp from datetime import datetime, timedelta from collections import defaultdict import calendar class Elterndienstplaner: def __init__(self): self.dienste = ['F', 'P', 'E', 'K', 'A'] # Frühstück, Putz, Essen, Kochen, Elternabend self.dienst_namen = { 'F': 'Frühstücksdienst', 'P': 'Putznotdienst', 'E': 'Essensausgabenotdienst', 'K': 'Kochen', 'A': 'Elternabend' } # Datenstrukturen self.tage = [] self.eltern = [] self.benoetigte_dienste = {} # {datum: [dienste]} self.verfügbarkeit = {} # {(eltern, datum): bool} self.präferenzen = {} # {(eltern, datum, dienst): 1 (bevorzugt) oder -1 (abgelehnt)} self.dienstfaktoren = {} # {eltern: {datum: faktor}} self.alle_zeitraeume = {} # {eltern: [(beginn, ende, faktor), ...]} - ALLE Zeiträume self.vorherige_dienste = defaultdict(lambda: defaultdict(int)) # {eltern: {dienst: anzahl}} self.historische_dienste = [] # [(datum, eltern, dienst), ...] - Alle historischen Dienste mit Datum def lade_eingabe_csv(self, datei): """Lädt die eingabe.csv mit Terminen und Präferenzen""" print(f"Lade Eingabedaten aus {datei}...") with open(datei, 'r', encoding='utf-8') as f: reader = csv.reader(f) header = next(reader) # Eltern aus Header extrahieren (ab Spalte 3) self.eltern = [name.strip() for name in header[3:] if name.strip()] print(f"Gefundene Eltern: {self.eltern}") for row in reader: if len(row) < 3: continue datum = row[0].strip() wochentag = row[1].strip() dienste_str = row[2].strip() # Datum parsen try: datum_obj = datetime.strptime(datum, '%Y-%m-%d').date() self.tage.append(datum_obj) except ValueError: print(f"Warnung: Ungültiges Datum {datum}") continue # Benötigte Dienste self.benoetigte_dienste[datum_obj] = list(dienste_str) # Verfügbarkeit und Präferenzen der Eltern for i, eltern_name in enumerate(self.eltern): if i + 3 < len(row): präf_str = row[i + 3].strip() # Verfügbarkeit prüfen if präf_str == 'x': self.verfügbarkeit[(eltern_name, datum_obj)] = False else: self.verfügbarkeit[(eltern_name, datum_obj)] = True # Präferenzen parsen self._parse_präferenzen(eltern_name, datum_obj, präf_str) else: # Standard: verfügbar, keine Präferenzen self.verfügbarkeit[(eltern_name, datum_obj)] = True self.tage.sort() print(f"Zeitraum: {self.tage[0]} bis {self.tage[-1]} ({len(self.tage)} Tage)") def _parse_präferenzen(self, eltern, datum, präf_str): """Parst Präferenzstring wie 'F+P-E+' """ i = 0 while i < len(präf_str): if i + 1 < len(präf_str) and präf_str[i] in self.dienste: dienst = präf_str[i] if i + 1 < len(präf_str): if präf_str[i + 1] == '+': self.präferenzen[(eltern, datum, dienst)] = 1 i += 2 elif präf_str[i + 1] == '-': self.präferenzen[(eltern, datum, dienst)] = -1 i += 2 else: i += 1 else: i += 1 else: i += 1 def lade_eltern_csv(self, datei): """Lädt die eltern.csv mit Dienstfaktoren""" print(f"Lade Elterndaten aus {datei}...") with open(datei, 'r', encoding='utf-8') as f: reader = csv.reader(f) header = next(reader) for row in reader: if len(row) < 4: continue eltern_name = row[0].strip() # Initialisiere Datenstrukturen self.dienstfaktoren[eltern_name] = {} self.alle_zeitraeume[eltern_name] = [] # Alle Zeiträume einlesen (jeweils 3 Spalten: Beginn, Ende, Faktor) for i in range(1, len(row), 3): if i + 2 < len(row) and row[i].strip() and row[i + 1].strip(): try: beginn = datetime.strptime(row[i].strip(), '%Y-%m-%d').date() ende = datetime.strptime(row[i + 1].strip(), '%Y-%m-%d').date() faktor = float(row[i + 2].strip()) if row[i + 2].strip() else 0 # Zeitraum speichern self.alle_zeitraeume[eltern_name].append((beginn, ende, faktor)) # Faktor für Tage im aktuellen Planungsmonat setzen for tag in self.tage: if beginn <= tag <= ende: self.dienstfaktoren[eltern_name][tag] = faktor except (ValueError, IndexError): continue # Tage ohne expliziten Faktor auf 0 setzen for tag in self.tage: if tag not in self.dienstfaktoren[eltern_name]: self.dienstfaktoren[eltern_name][tag] = 0 print(f"Dienstfaktoren geladen für {len(self.dienstfaktoren)} Eltern") print(f"Zeiträume gespeichert für globale Fairness-Berechnung") def lade_vorherige_ausgaben_csv(self, datei): """Lädt vorherige-ausgaben.csv für Fairness-Constraints""" print(f"Lade vorherige Ausgaben aus {datei}...") try: with open(datei, 'r', encoding='utf-8') as f: reader = csv.reader(f) header = next(reader) # Dienst-Spalten finden dienst_spalten = {} for i, col_name in enumerate(header[2:], 2): # Ab Spalte 2 (nach Datum, Wochentag) for dienst_kürzel, dienst_name in self.dienst_namen.items(): if dienst_name.lower() in col_name.lower() or dienst_kürzel == col_name: dienst_spalten[dienst_kürzel] = i break for row in reader: if len(row) < 3: continue # Datum parsen try: datum = datetime.strptime(row[0].strip(), '%Y-%m-%d').date() except ValueError: continue # Zugeteilte Dienste zählen UND mit Datum speichern for dienst, spalte_idx in dienst_spalten.items(): if spalte_idx < len(row) and row[spalte_idx].strip(): # Mehrere Eltern können in einer Zelle stehen (durch Leerzeichen getrennt) eltern_liste = row[spalte_idx].strip().split() for eltern_name in eltern_liste: if eltern_name in self.eltern: # Summierung für Kompatibilität self.vorherige_dienste[eltern_name][dienst] += 1 # Historische Dienste mit Datum speichern self.historische_dienste.append((datum, eltern_name, dienst)) except FileNotFoundError: print("Keine vorherigen Ausgaben gefunden - starte ohne historische Daten") print(f"Vorherige Dienste geladen: {dict(self.vorherige_dienste)}") print(f"Historische Dienste mit Datum: {len(self.historische_dienste)} Einträge") def berechne_dienstfaktor_an_datum(self, eltern, datum): """Berechnet den Dienstfaktor eines Elternteils an einem bestimmten Datum""" if eltern not in self.alle_zeitraeume: return 0 for beginn, ende, faktor in self.alle_zeitraeume[eltern]: if beginn <= datum <= ende: return faktor return 0 def berechne_faire_zielverteilung(self): """Berechnet die faire Zielanzahl von Diensten pro Eltern-Dienst-Kombination basierend auf tatsächlich geleisteten historischen Diensten und deren fairer Umverteilung""" ziel_dienste = defaultdict(lambda: defaultdict(float)) # {eltern: {dienst: anzahl}} print("\nBerechne faire Zielverteilung basierend auf historischen Daten...") # Wenn keine historischen Daten vorhanden, nur aktuellen Monat berechnen if not self.historische_dienste: print(" Keine historischen Daten - berechne nur für aktuellen Monat") for dienst in self.dienste: benoetigte_dienste_monat = sum( 1 for tag in self.tage if dienst in self.benoetigte_dienste.get(tag, []) ) if dienst == 'A': benoetigte_dienste_monat *= 2 if benoetigte_dienste_monat > 0: gesamt_dienstfaktor_monat = sum( sum(self.dienstfaktoren.get(e, {}).get(tag, 0) for tag in self.tage) for e in self.eltern ) if gesamt_dienstfaktor_monat > 0: for eltern in self.eltern: monatsfaktor = sum( self.dienstfaktoren.get(eltern, {}).get(tag, 0) for tag in self.tage ) if monatsfaktor > 0: anteil = monatsfaktor / gesamt_dienstfaktor_monat faire_zuteilung = anteil * benoetigte_dienste_monat ziel_dienste[eltern][dienst] = faire_zuteilung return ziel_dienste # Historische Dienste nach Datum gruppieren historische_tage = set(datum for datum, _, _ in self.historische_dienste) print(f" Analysiere {len(historische_tage)} historische Tage mit {len(self.historische_dienste)} Diensten") for dienst in self.dienste: print(f" Verarbeite Dienst {dienst}...") # 1. HISTORISCHE PERIODE: Faire Umverteilung der tatsächlich geleisteten Dienste historische_dienste_dieses_typs = [ (datum, eltern) for datum, eltern, d in self.historische_dienste if d == dienst ] print(f" Gefundene historische {dienst}-Dienste: {len(historische_dienste_dieses_typs)}") # Gruppiere nach Datum dienste_pro_tag = defaultdict(list) for datum, eltern in historische_dienste_dieses_typs: dienste_pro_tag[datum].append(eltern) # Für jeden historischen Tag faire Umverteilung berechnen for hist_datum, geleistete_eltern in dienste_pro_tag.items(): anzahl_dienste = len(geleistete_eltern) # Anzahl Dienste an diesem Tag # Dienstfaktoren aller Eltern für diesen historischen Tag berechnen dienstfaktoren_tag = {} gesamt_dienstfaktor_tag = 0 for eltern in self.eltern: faktor = self.berechne_dienstfaktor_an_datum(eltern, hist_datum) dienstfaktoren_tag[eltern] = faktor gesamt_dienstfaktor_tag += faktor # Faire Umverteilung der an diesem Tag geleisteten Dienste if gesamt_dienstfaktor_tag > 0: for eltern in self.eltern: if dienstfaktoren_tag[eltern] > 0: anteil = dienstfaktoren_tag[eltern] / gesamt_dienstfaktor_tag faire_zuteilung = anteil * anzahl_dienste ziel_dienste[eltern][dienst] += faire_zuteilung if faire_zuteilung > 0.01: # Debug nur für relevante Werte print(f" {hist_datum}: {eltern} Faktor={dienstfaktoren_tag[eltern]} " f"-> {faire_zuteilung:.2f} von {anzahl_dienste} Diensten") # 2. AKTUELLER MONAT: Faire Verteilung der benötigten Dienste benoetigte_dienste_monat = sum( 1 for tag in self.tage if dienst in self.benoetigte_dienste.get(tag, []) ) if dienst == 'A': benoetigte_dienste_monat *= 2 if benoetigte_dienste_monat > 0: # Gesamtdienstfaktor für aktuellen Monat gesamt_dienstfaktor_monat = sum( sum(self.dienstfaktoren.get(e, {}).get(tag, 0) for tag in self.tage) for e in self.eltern ) if gesamt_dienstfaktor_monat > 0: for eltern in self.eltern: monatsfaktor = sum( self.dienstfaktoren.get(eltern, {}).get(tag, 0) for tag in self.tage ) if monatsfaktor > 0: anteil = monatsfaktor / gesamt_dienstfaktor_monat faire_zuteilung = anteil * benoetigte_dienste_monat ziel_dienste[eltern][dienst] += faire_zuteilung # Debug-Ausgabe für diesen Dienst total_historisch = sum( ziel_dienste[e][dienst] for e in self.eltern if dienst in [d for _, _, d in self.historische_dienste if d == dienst] ) total_aktuell = benoetigte_dienste_monat print(f" {dienst}: Historisch faire Summe={total_historisch:.1f}, " f"Aktuell benötigt={total_aktuell}") # Debug-Output: Detaillierte Zielverteilung print("\n Berechnete Zielverteilung (basierend auf tatsächlichen historischen Diensten):") for eltern in sorted(self.eltern): for dienst in self.dienste: if ziel_dienste[eltern][dienst] > 0.1: # Nur relevante Werte ist = self.vorherige_dienste[eltern][dienst] ziel = ziel_dienste[eltern][dienst] print(f" {eltern} {dienst}: IST={ist}, FAIRE_ZIEL={ziel:.2f}, DIFF={ziel-ist:.2f}") return ziel_dienste def erstelle_optimierungsmodell(self): """Erstellt das PuLP Optimierungsmodell""" print("Erstelle Optimierungsmodell...") # Debugging: Verfügbarkeit prüfen print("\nDebug: Verfügbarkeit analysieren...") for tag in self.tage[:5]: # Erste 5 Tage verfügbare = [e for e in self.eltern if self.verfügbarkeit.get((e, tag), True)] benötigte = self.benoetigte_dienste.get(tag, []) print(f" {tag}: Benötigt {len(benötigte)} Dienste {benötigte}, verfügbar: {verfügbare}") # LP Problem erstellen prob = pulp.LpProblem("Elterndienstplaner", pulp.LpMinimize) # Entscheidungsvariablen: x[eltern, tag, dienst] ∈ {0,1} x = {} for eltern in self.eltern: for tag in self.tage: for dienst in self.dienste: if dienst in self.benoetigte_dienste.get(tag, []): x[eltern, tag, dienst] = pulp.LpVariable( f"x_{eltern.replace(' ', '_')}_{tag}_{dienst}", cat='Binary' ) # Vereinfachtes Modell: Grundlegende Constraints # C1: Je Eltern und Dienst nur einmal die Woche woche_start = self.tage[0] woche_nr = 0 while woche_start <= self.tage[-1]: woche_ende = min(woche_start + timedelta(days=6), self.tage[-1]) woche_tage = [t for t in self.tage if woche_start <= t <= woche_ende] for eltern in self.eltern: for dienst in self.dienste: woche_vars = [] for tag in woche_tage: if (eltern, tag, dienst) in x: woche_vars.append(x[eltern, tag, dienst]) if woche_vars: prob += pulp.lpSum(woche_vars) <= 1, f"C1_{eltern.replace(' ', '_')}_{dienst}_w{woche_nr}" woche_start += timedelta(days=7) woche_nr += 1 # C2: Je Eltern nur einen Dienst am Tag for eltern in self.eltern: for tag in self.tage: tag_vars = [] for dienst in self.dienste: if (eltern, tag, dienst) in x: tag_vars.append(x[eltern, tag, dienst]) if tag_vars: prob += pulp.lpSum(tag_vars) <= 1, f"C2_{eltern.replace(' ', '_')}_{tag}" # C3: Dienste nur verfügbaren Eltern zuteilen for eltern in self.eltern: for tag in self.tage: if not self.verfügbarkeit.get((eltern, tag), True): for dienst in self.dienste: if (eltern, tag, dienst) in x: prob += x[eltern, tag, dienst] == 0, f"C3_{eltern.replace(' ', '_')}_{tag}_{dienst}" # Alle benötigten Dienste müssen zugeteilt werden (flexibel) for tag in self.tage: for dienst in self.benoetigte_dienste.get(tag, []): dienst_vars = [] verfuegbare_eltern = 0 for eltern in self.eltern: if (eltern, tag, dienst) in x: # Prüfe ob Eltern verfügbar if self.verfügbarkeit.get((eltern, tag), True): dienst_vars.append(x[eltern, tag, dienst]) verfuegbare_eltern += 1 if dienst_vars: # Genau 1 Person pro Dienst (außer Elternabend: genau 2) benoetigte_personen = 2 if dienst == 'A' else 1 prob += pulp.lpSum(dienst_vars) == benoetigte_personen, f"Bedarf_{tag}_{dienst}" # FAIRNESS-CONSTRAINTS UND ZIELFUNKTION objective_terms = [] # Berechne faire Zielverteilung ziel_dienste = self.berechne_faire_zielverteilung() # Hilfsvariablen für Fairness-Abweichungen fairness_abweichung_lokal = {} # F2 fairness_abweichung_global = {} # F1 for eltern in self.eltern: for dienst in self.dienste: fairness_abweichung_lokal[eltern, dienst] = pulp.LpVariable( f"fair_lokal_{eltern.replace(' ', '_')}_{dienst}", lowBound=0) fairness_abweichung_global[eltern, dienst] = pulp.LpVariable( f"fair_global_{eltern.replace(' ', '_')}_{dienst}", lowBound=0) # F1: Globale Fairness & F2: Lokale Fairness for eltern in self.eltern: for dienst in self.dienste: # Dienstfaktor für aktuellen Monat monatlicher_dienstfaktor = sum( self.dienstfaktoren.get(eltern, {}).get(tag, 0) for tag in self.tage ) if monatlicher_dienstfaktor > 0: # Tatsächliche Dienste im aktuellen Monat tatsaechliche_dienste_monat = pulp.lpSum( x[eltern, tag, dienst] for tag in self.tage if (eltern, tag, dienst) in x ) # F2: Lokale Fairness - nur aktueller Monat benoetigte_dienste_monat = sum( 1 for tag in self.tage if dienst in self.benoetigte_dienste.get(tag, []) ) if dienst == 'A': benoetigte_dienste_monat *= 2 gesamt_dienstfaktor_monat = sum( sum(self.dienstfaktoren.get(e, {}).get(tag, 0) for tag in self.tage) for e in self.eltern ) if gesamt_dienstfaktor_monat > 0 and benoetigte_dienste_monat > 0: erwartete_dienste_lokal = ( monatlicher_dienstfaktor / gesamt_dienstfaktor_monat ) * benoetigte_dienste_monat # F2: Lokale Fairness-Constraints prob += (tatsaechliche_dienste_monat - erwartete_dienste_lokal <= fairness_abweichung_lokal[eltern, dienst]) prob += (erwartete_dienste_lokal - tatsaechliche_dienste_monat <= fairness_abweichung_lokal[eltern, dienst]) # F1: Globale Fairness - basierend auf berechneter Zielverteilung ziel_gesamt = ziel_dienste[eltern][dienst] vorherige_dienste = self.vorherige_dienste[eltern][dienst] if ziel_gesamt > 0: # Tatsächliche Dienste global (Vergangenheit + geplant) total_dienste_inkl_vergangenheit = tatsaechliche_dienste_monat + vorherige_dienste # F1: Globale Fairness-Constraints prob += (total_dienste_inkl_vergangenheit - ziel_gesamt <= fairness_abweichung_global[eltern, dienst]) prob += (ziel_gesamt - total_dienste_inkl_vergangenheit <= fairness_abweichung_global[eltern, dienst]) # Gewichtung: Jahresanfang F1 stärker, Jahresende F2 stärker # Annahme: September = Jahresanfang, Juli = Jahresende aktueller_monat = self.tage[0].month if self.tage else 1 if 9 <= aktueller_monat <= 12: # Sep-Dez: Jahresanfang gewicht_f1 = 100 # Global wichtiger gewicht_f2 = 50 # Lokal weniger wichtig elif 1 <= aktueller_monat <= 3: # Jan-Mar: Jahresmitte gewicht_f1 = 75 gewicht_f2 = 75 else: # Apr-Jul: Jahresende gewicht_f1 = 50 # Global weniger wichtig gewicht_f2 = 100 # Lokal wichtiger # Fairness-Terme zur Zielfunktion hinzufügen for eltern in self.eltern: for dienst in self.dienste: objective_terms.append(gewicht_f1 * fairness_abweichung_global[eltern, dienst]) objective_terms.append(gewicht_f2 * fairness_abweichung_lokal[eltern, dienst]) # P1: Bevorzugte Dienste (positiv belohnen) for (eltern, tag, dienst), präf in self.präferenzen.items(): if (eltern, tag, dienst) in x and präf == 1: # bevorzugt objective_terms.append(-5 * x[eltern, tag, dienst]) # Schwächer als Fairness # P2: Abgelehnte Dienste (bestrafen) for (eltern, tag, dienst), präf in self.präferenzen.items(): if (eltern, tag, dienst) in x and präf == -1: # abgelehnt objective_terms.append(25 * x[eltern, tag, dienst]) # Schwächer als Fairness # Zielfunktion setzen if objective_terms: prob += pulp.lpSum(objective_terms) else: # Fallback: Minimiere Gesamtanzahl Dienste prob += pulp.lpSum([var for var in x.values()]) print(f"Verwende Gewichtung: F1 (global) = {gewicht_f1}, F2 (lokal) = {gewicht_f2}") print(f"Modell erstellt mit {len(x)} Variablen und {len(prob.constraints)} Constraints") return prob, x def löse_optimierung(self, prob, x): """Löst das Optimierungsproblem""" print("Löse Optimierungsproblem...") # Solver wählen (verfügbare Solver testen) solver = None try: solver = pulp.PULP_CBC_CMD(msg=0) # Standard CBC Solver except: try: solver = pulp.GLPK_CMD(msg=0) # GLPK falls verfügbar except: solver = None # Default Solver prob.solve(solver) status = pulp.LpStatus[prob.status] print(f"Optimierung abgeschlossen: {status}") if prob.status != pulp.LpStatusOptimal: print("WARNUNG: Keine optimale Lösung gefunden!") return None # Lösung extrahieren lösung = {} for (eltern, tag, dienst), var in x.items(): if var.varValue and var.varValue > 0.5: # Binary variable ist 1 if tag not in lösung: lösung[tag] = {} if dienst not in lösung[tag]: lösung[tag][dienst] = [] lösung[tag][dienst].append(eltern) return lösung def schreibe_ausgabe_csv(self, datei, lösung): """Schreibt die Lösung in die ausgabe.csv""" print(f"Schreibe Ergebnisse nach {datei}...") with open(datei, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) # Header schreiben header = ['Datum', 'Wochentag'] + [self.dienst_namen[d] for d in self.dienste] writer.writerow(header) # Daten schreiben for tag in sorted(self.tage): wochentag = ['Montag', 'Dienstag', 'Mittwoch', 'Donnerstag', 'Freitag', 'Samstag', 'Sonntag'][tag.weekday()] row = [tag.strftime('%Y-%m-%d'), wochentag] for dienst in self.dienste: if tag in lösung and dienst in lösung[tag]: eltern_str = ' '.join(lösung[tag][dienst]) else: eltern_str = '' row.append(eltern_str) writer.writerow(row) print("Ausgabe erfolgreich geschrieben!") def drucke_statistiken(self, lösung): """Druckt Statistiken zur Lösung""" print("\n" + "="*50) print("STATISTIKEN") print("="*50) # Dienste pro Eltern zählen dienste_pro_eltern = defaultdict(lambda: defaultdict(int)) for tag, tag_dienste in lösung.items(): for dienst, eltern_liste in tag_dienste.items(): for eltern in eltern_liste: dienste_pro_eltern[eltern][dienst] += 1 # Gesamtübersicht print("\nDienste pro Eltern:") for eltern in sorted(self.eltern): gesamt = sum(dienste_pro_eltern[eltern].values()) dienste_detail = ', '.join(f"{d}:{dienste_pro_eltern[eltern][d]}" for d in self.dienste if dienste_pro_eltern[eltern][d] > 0) print(f" {eltern:15} {gesamt:3d} ({dienste_detail})") # Dienstfaktor-Analyse print(f"\nDienstfaktoren im Planungszeitraum:") for eltern in sorted(self.eltern): faktor_summe = sum(self.dienstfaktoren.get(eltern, {}).get(tag, 0) for tag in self.tage) print(f" {eltern:15} {faktor_summe:.1f}") def main(): if len(sys.argv) < 4: print("Usage: ./elterndienstplaner.py []") sys.exit(1) eingabe_datei = sys.argv[1] eltern_datei = sys.argv[2] ausgabe_datei = sys.argv[3] vorherige_datei = sys.argv[4] if len(sys.argv) > 4 else None print("Elterndienstplaner gestartet") print("="*50) try: planer = Elterndienstplaner() # Daten laden planer.lade_eingabe_csv(eingabe_datei) planer.lade_eltern_csv(eltern_datei) if vorherige_datei: planer.lade_vorherige_ausgaben_csv(vorherige_datei) # Optimierung prob, x = planer.erstelle_optimierungsmodell() lösung = planer.löse_optimierung(prob, x) if lösung is not None: # Ergebnisse ausgeben planer.schreibe_ausgabe_csv(ausgabe_datei, lösung) planer.drucke_statistiken(lösung) print("\n✓ Planung erfolgreich abgeschlossen!") else: print("\n✗ Fehler: Keine gültige Lösung gefunden!") sys.exit(1) except Exception as e: print(f"\n✗ Fehler: {e}") sys.exit(1) if __name__ == "__main__": main()