elterndienstplaner/elterndienstplaner.py
2025-12-25 21:43:29 +01:00

571 lines
24 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Elterndienstplaner - Optimale Zuteilung von Elterndiensten
Autor: Automatisch generiert
Datum: Dezember 2025
"""
import sys
import pulp
from datetime import timedelta, date
from collections import defaultdict
from typing import Dict, List, Tuple, DefaultDict, Optional
from datenmodell import ElterndienstplanerDaten, Dienst, Eltern, Zielverteilung, Entscheidungsvariablen
from ausgabe import ElterndienstAusgabe
class Elterndienstplaner:
"""Optimierungs-Engine für Elterndienstplanung"""
def __init__(self, daten: ElterndienstplanerDaten, ausgabe: ElterndienstAusgabe) -> None:
self.daten = daten
self.ausgabe = ausgabe
def berechne_faire_zielverteilung_global(self) -> Zielverteilung:
"""Berechnet die faire Zielanzahl von Diensten für den Planungszeitraum
basierend auf globaler Fairness (Historie + aktueller Planungszeitraum).
Gibt die Ziel-Dienstanzahl für den aktuellen Planungszeitraum zurück,
korrigiert um bereits geleistete Dienste. Kann negativ sein, wenn bereits
mehr Dienste geleistet wurden als fair wäre."""
ziel_dienste: Zielverteilung = defaultdict(lambda: defaultdict(float))
print("\nBerechne faire Zielverteilung basierend auf historischen Daten...")
# Historische Dienste nach Datum gruppieren
historische_tage = set(datum for datum, _, _ in self.daten.historische_dienste) if self.daten.historische_dienste else set()
print(f" Analysiere {len(historische_tage)} historische Tage mit {len(self.daten.historische_dienste)} Diensten")
for dienst in self.daten.dienste:
print(f" Verarbeite Dienst {dienst.kuerzel}...")
# 1. HISTORISCHE PERIODE: Faire Umverteilung der tatsächlich geleisteten Dienste
historische_dienste_dieses_typs = [
(datum, eltern) for datum, eltern, d in self.daten.historische_dienste
if d == dienst
]
print(f" Gefundene historische {dienst.kuerzel}-Dienste: {len(historische_dienste_dieses_typs)}")
# Gruppiere nach Datum
dienste_pro_tag = defaultdict(list)
for datum, eltern in historische_dienste_dieses_typs:
dienste_pro_tag[datum].append(eltern)
# Für jeden historischen Tag faire Umverteilung berechnen
for tag, geleistete_eltern in dienste_pro_tag.items():
anzahl_dienste = len(geleistete_eltern) # Anzahl Dienste an diesem Tag
# Dienstfaktoren aller Eltern für diesen historischen Tag berechnen
gesamt_dienstfaktor_tag = 0
for eltern in self.daten.eltern:
gesamt_dienstfaktor_tag += self.daten.dienstfaktoren[eltern][tag]
# Faire Umverteilung der an diesem Tag geleisteten Dienste
if gesamt_dienstfaktor_tag > 0:
for eltern in self.daten.eltern:
if self.daten.dienstfaktoren[eltern][tag] > 0:
anteil = self.daten.dienstfaktoren[eltern][tag] / gesamt_dienstfaktor_tag
faire_zuteilung = anteil * anzahl_dienste
ziel_dienste[eltern][dienst] += faire_zuteilung
if faire_zuteilung > 0.01: # Debug nur für relevante Werte
print(f" {tag}: {eltern} Faktor={self.daten.dienstfaktoren[eltern][tag]} "
f"-> {faire_zuteilung:.2f} von {anzahl_dienste} Diensten")
# 2. AKTUELLER PLANUNGSZEITRAUM: Faire Verteilung der benötigten Dienste (tageweise wie bei historischen Diensten)
benoetigte_dienste_planungszeitraum = 0
# Für jeden Tag im aktuellen Planungszeitraum faire Umverteilung berechnen
for tag in self.daten.planungszeitraum:
# Prüfe ob an diesem Tag der Dienst benötigt wird
if dienst not in self.daten.benoetigte_dienste.get(tag, []):
continue
benoetigte_dienste_planungszeitraum += dienst.personen_anzahl
# Dienstfaktoren aller Eltern für diesen Tag berechnen
dienstfaktoren = {}
gesamt_dienstfaktor_tag = 0
for eltern in self.daten.eltern:
faktor = self.daten.dienstfaktoren[eltern][tag]
dienstfaktoren[eltern] = faktor
gesamt_dienstfaktor_tag += faktor
# Faire Umverteilung der an diesem Tag benötigten Dienste
if gesamt_dienstfaktor_tag > 0:
for eltern in self.daten.eltern:
anteil = dienstfaktoren[eltern] / gesamt_dienstfaktor_tag
faire_zuteilung = anteil * dienst.personen_anzahl
ziel_dienste[eltern][dienst] += faire_zuteilung
# 3. ABZUG DER BEREITS GELEISTETEN DIENSTE
# Ziehe die tatsächlich geleisteten Dienste ab, um das Ziel für den Planungszeitraum zu erhalten
for eltern in self.daten.eltern:
# Berechne vorherige Dienste on-the-fly aus historischen Diensten
vorherige_anzahl = sum(
1 for _, hist_eltern, hist_dienst in self.daten.historische_dienste
if hist_eltern == eltern and hist_dienst == dienst
)
ziel_dienste[eltern][dienst] -= vorherige_anzahl
return ziel_dienste
def berechne_faire_zielverteilung_lokal(self) -> Zielverteilung:
"""Berechnet die lokale faire Zielanzahl von Diensten pro Eltern-Dienst-Kombination
basierend auf Dienstfaktoren und benötigten Diensten im aktuellen Planungszeitraum"""
ziel_dienste_lokal: Zielverteilung = defaultdict(lambda: defaultdict(float))
print("\nBerechne lokale faire Zielverteilung für aktuellen Planungszeitraum...")
# Gesamtdienstfaktor für aktuellen Planungszeitraum berechnen
summe_dienstfaktor_planungszeitraum_alle_eltern = sum(
sum(self.daten.dienstfaktoren[e][tag] for tag in self.daten.planungszeitraum)
for e in self.daten.eltern
)
if summe_dienstfaktor_planungszeitraum_alle_eltern == 0:
print(" WARNUNG: Gesamtdienstfaktor ist 0, keine lokale Zielverteilung möglich")
return ziel_dienste_lokal
# Für jeden Dienst die lokale faire Verteilung berechnen
for dienst in self.daten.dienste:
# Anzahl benötigter Dienste im aktuellen Planungszeitraum
benoetigte_dienste_planungszeitraum = sum(
1 for tag in self.daten.planungszeitraum
if dienst in self.daten.benoetigte_dienste.get(tag, [])
)
# Multipliziere mit Anzahl benötigter Personen pro Dienst
benoetigte_dienste_planungszeitraum *= dienst.personen_anzahl
if benoetigte_dienste_planungszeitraum > 0:
print(f" {dienst.kuerzel}: {benoetigte_dienste_planungszeitraum} Dienste benötigt")
for eltern in self.daten.eltern:
# Dienstfaktor für diesen Elternteil im aktuellen Planungszeitraum
summe_dienstfaktor_planungszeitraum = sum(
self.daten.dienstfaktoren[eltern][tag] for tag in self.daten.planungszeitraum
)
if summe_dienstfaktor_planungszeitraum > 0:
anteil = summe_dienstfaktor_planungszeitraum / summe_dienstfaktor_planungszeitraum_alle_eltern
faire_zuteilung = anteil * benoetigte_dienste_planungszeitraum
ziel_dienste_lokal[eltern][dienst] = faire_zuteilung
return ziel_dienste_lokal
def _erstelle_entscheidungsvariablen(self) -> Entscheidungsvariablen:
"""Erstellt die binären Entscheidungsvariablen x[eltern, tag, dienst]"""
x: Entscheidungsvariablen = {}
for eltern in self.daten.eltern:
for tag in self.daten.planungszeitraum:
for dienst in self.daten.dienste:
if dienst in self.daten.benoetigte_dienste.get(tag, []):
x[eltern, tag, dienst] = pulp.LpVariable(
f"x_{eltern.replace(' ', '_')}_{tag}_{dienst.kuerzel}",
cat='Binary'
)
return x
def _add_constraint_ein_dienst_pro_woche(
self,
prob: pulp.LpProblem,
x: Entscheidungsvariablen
) -> None:
"""C1: Je Eltern und Dienst nur einmal die Woche (Woche = Montag bis Sonntag)"""
erster_tag = self.daten.planungszeitraum[0]
# weekday(): 0=Montag, 6=Sonntag
# Finde Montag am oder vor dem ersten Planungstag (für historische Dienste)
woche_start = erster_tag - timedelta(days=erster_tag.weekday())
woche_nr = 0
letzter_tag = self.daten.planungszeitraum[-1]
while woche_start <= letzter_tag:
woche_ende = woche_start + timedelta(days=6) # Sonntag
for eltern in self.daten.eltern:
for dienst in self.daten.dienste:
woche_vars = []
# Zähle historische Dienste in dieser Woche (VOR dem Planungszeitraum)
historische_dienste_in_woche = 0
if woche_start < erster_tag:
for hist_datum, hist_eltern, hist_dienst in self.daten.historische_dienste:
if (hist_eltern == eltern and
hist_dienst == dienst and
woche_start <= hist_datum < erster_tag):
historische_dienste_in_woche += 1
# Sammle Variablen für Planungszeitraum in dieser Woche
for tag in self.daten.planungszeitraum:
if woche_start <= tag <= woche_ende:
if (eltern, tag, dienst) in x:
woche_vars.append(x[eltern, tag, dienst])
# Constraint: Historische + geplante Dienste <= 1
if woche_vars:
prob += pulp.lpSum(woche_vars) <= 1 - historische_dienste_in_woche, \
f"C1_{eltern.replace(' ', '_')}_{dienst.kuerzel}_w{woche_nr}"
woche_start += timedelta(days=7)
woche_nr += 1
def _add_constraint_ein_dienst_pro_tag(
self,
prob: pulp.LpProblem,
x: Entscheidungsvariablen
) -> None:
"""C2: Je Eltern nur einen Dienst am Tag"""
for eltern in self.daten.eltern:
for tag in self.daten.planungszeitraum:
tag_vars = []
for dienst in self.daten.dienste:
if (eltern, tag, dienst) in x:
tag_vars.append(x[eltern, tag, dienst])
if tag_vars:
prob += pulp.lpSum(tag_vars) <= 1, f"C2_{eltern.replace(' ', '_')}_{tag}"
def _add_constraint_verfuegbarkeit(
self,
prob: pulp.LpProblem,
x: Entscheidungsvariablen
) -> None:
"""C3: Dienste nur verfügbaren Eltern zuteilen"""
for eltern in self.daten.eltern:
for tag in self.daten.planungszeitraum:
if not self.daten.verfügbarkeit.get((eltern, tag), True):
for dienst in self.daten.dienste:
if (eltern, tag, dienst) in x:
prob += x[eltern, tag, dienst] == 0, \
f"C3_{eltern.replace(' ', '_')}_{tag}_{dienst.kuerzel}"
def _add_constraint_dienst_bedarf(
self,
prob: pulp.LpProblem,
x: Entscheidungsvariablen
) -> None:
"""C4: Alle benötigten Dienste müssen zugeteilt werden"""
for tag in self.daten.planungszeitraum:
for dienst in self.daten.benoetigte_dienste.get(tag, []):
dienst_vars = []
for eltern in self.daten.eltern:
if (eltern, tag, dienst) in x:
# Prüfe ob Eltern verfügbar
if self.daten.verfügbarkeit.get((eltern, tag), True):
dienst_vars.append(x[eltern, tag, dienst])
if dienst_vars:
# Anzahl benötigter Personen pro Dienst (aus Dienst-Objekt)
benoetigte_personen = dienst.personen_anzahl
prob += pulp.lpSum(dienst_vars) == benoetigte_personen, \
f"Bedarf_{tag}_{dienst.kuerzel}"
def _add_fairness_constraints(
self,
prob: pulp.LpProblem,
x: Entscheidungsvariablen,
ziel_dienste: Zielverteilung,
constraint_prefix: str
) -> Dict:
"""Erstellt Fairness-Variablen und fügt Fairness-Constraints hinzu
Args:
prob: Das LP-Problem
x: Die Entscheidungsvariablen
ziel_dienste: Die Zielverteilung der Dienste
constraint_prefix: Präfix für Constraint-Namen ('lokal' oder 'global')
Returns:
Dictionary mit Fairness-Abweichungsvariablen
"""
# Hilfsvariablen für Fairness-Abweichungen erstellen
fairness_abweichung = {}
for eltern in self.daten.eltern:
for dienst in self.daten.dienste:
fairness_abweichung[eltern, dienst] = pulp.LpVariable(
f"fair_{constraint_prefix}_{eltern.replace(' ', '_')}_{dienst.kuerzel}",
lowBound=0)
# Fairness-Constraints hinzufügen
for eltern in self.daten.eltern:
for dienst in self.daten.dienste:
# Tatsächliche Dienste im aktuellen Planungszeitraum
zugeteilte_dienste_planungszeitraum = pulp.lpSum(
x[eltern, tag, dienst]
for tag in self.daten.planungszeitraum
if (eltern, tag, dienst) in x
)
# Ziel für diese Fairness-Variante
ziel = ziel_dienste[eltern][dienst]
prob += (zugeteilte_dienste_planungszeitraum - ziel <=
fairness_abweichung[eltern, dienst])
prob += (ziel - zugeteilte_dienste_planungszeitraum <=
fairness_abweichung[eltern, dienst])
return fairness_abweichung
def _add_constraint_gesamtfairness(
self,
prob: pulp.LpProblem,
x: Entscheidungsvariablen,
ziel_dienste: Zielverteilung,
constraint_prefix: str
) -> Dict:
"""F3: Dienstübergreifende Fairness - verhindert Häufung bei einzelnen Eltern
Berechnet die Abweichung der Gesamtdienstanzahl (über alle Diensttypen)
vom fairen Gesamtziel. Dies verhindert, dass einzelne Eltern über alle
Diensttypen hinweg überproportional viele Dienste bekommen.
Args:
prob: Das LP-Problem
x: Die Entscheidungsvariablen
ziel_dienste: Die Zielverteilung (global oder lokal)
constraint_prefix: Präfix für Constraint-Namen ('lokal' oder 'global')
Returns:
Dictionary mit Gesamt-Fairness-Abweichungsvariablen
"""
fairness_abweichung_gesamt = {}
for eltern in self.daten.eltern:
fairness_abweichung_gesamt[eltern] = pulp.LpVariable(
f"fair_gesamt_{constraint_prefix}_{eltern.replace(' ', '_')}",
lowBound=0)
# Tatsächliche Gesamtdienste für diesen Elternteil
tatsaechliche_dienste_gesamt = pulp.lpSum(
x[eltern, tag, dienst]
for tag in self.daten.planungszeitraum
for dienst in self.daten.dienste
if (eltern, tag, dienst) in x
)
# Ziel-Gesamtdienste für diesen Elternteil (Summe über alle Dienste)
ziel_gesamt = sum(ziel_dienste[eltern][dienst] for dienst in self.daten.dienste)
# Fairness-Constraints
prob += (tatsaechliche_dienste_gesamt - ziel_gesamt <=
fairness_abweichung_gesamt[eltern])
prob += (ziel_gesamt - tatsaechliche_dienste_gesamt <=
fairness_abweichung_gesamt[eltern])
return fairness_abweichung_gesamt
def _erstelle_zielfunktion(
self,
prob: pulp.LpProblem,
x: Entscheidungsvariablen,
fairness_abweichung_lokal: Dict,
fairness_abweichung_global: Dict,
fairness_abweichung_gesamt_global: Dict,
fairness_abweichung_gesamt_lokal: Dict
) -> None:
"""Erstellt die Zielfunktion mit Fairness und Präferenzen"""
objective_terms = []
# Fairness-Gewichtung
gewicht_global = 40
gewicht_lokal = 60
gewicht_f1 = gewicht_global
gewicht_f2 = gewicht_lokal
gewicht_f3_global = 0.25 * gewicht_global
gewicht_f3_lokal = 0.25 * gewicht_lokal
# Fairness-Terme zur Zielfunktion hinzufügen
for eltern in self.daten.eltern:
for dienst in self.daten.dienste:
objective_terms.append(gewicht_f1 * fairness_abweichung_global[eltern, dienst])
objective_terms.append(gewicht_f2 * fairness_abweichung_lokal[eltern, dienst])
# F3: Gesamtfairness (dienstübergreifend) - global und lokal
objective_terms.append(gewicht_f3_global * fairness_abweichung_gesamt_global[eltern])
objective_terms.append(gewicht_f3_lokal * fairness_abweichung_gesamt_lokal[eltern])
# P1: Bevorzugte Dienste (positiv belohnen)
for (eltern, tag, dienst), präf in self.daten.präferenzen.items():
if (eltern, tag, dienst) in x and präf == 1: # bevorzugt
objective_terms.append(-5 * x[eltern, tag, dienst])
# P2: Abgelehnte Dienste (bestrafen)
for (eltern, tag, dienst), präf in self.daten.präferenzen.items():
if (eltern, tag, dienst) in x and präf == -1: # abgelehnt
objective_terms.append(25 * x[eltern, tag, dienst])
# Zielfunktion setzen
if objective_terms:
prob += pulp.lpSum(objective_terms)
else:
# Fallback: Minimiere Gesamtanzahl Dienste
prob += pulp.lpSum([var for var in x.values()])
print(f"Verwende Gewichtung: F1 (global) = {gewicht_f1}, F2 (lokal) = {gewicht_f2}, "
f"F3_global = {gewicht_f3_global}, F3_lokal = {gewicht_f3_lokal}")
def erstelle_optimierungsmodell(self) -> Tuple[
pulp.LpProblem,
Entscheidungsvariablen
]:
"""Erstellt das PuLP Optimierungsmodell
Returns:
Tuple mit (prob, x, ziel_dienste_lokal, ziel_dienste_global)
"""
print("Erstelle Optimierungsmodell...")
# Debugging: Verfügbarkeit prüfen
print("\nDebug: Verfügbarkeit analysieren...")
for tag in self.daten.planungszeitraum[:5]: # Erste 5 Tage
verfügbare = [e for e in self.daten.eltern if self.daten.verfügbarkeit.get((e, tag), True)]
benötigte = self.daten.benoetigte_dienste.get(tag, [])
print(f" {tag}: Benötigt {len(benötigte)} Dienste {benötigte}, verfügbar: {verfügbare}")
# LP Problem erstellen
prob = pulp.LpProblem("Elterndienstplaner", pulp.LpMinimize)
# Entscheidungsvariablen erstellen
x = self._erstelle_entscheidungsvariablen()
# Grundlegende Constraints hinzufügen
self._add_constraint_ein_dienst_pro_woche(prob, x)
self._add_constraint_ein_dienst_pro_tag(prob, x)
self._add_constraint_verfuegbarkeit(prob, x)
self._add_constraint_dienst_bedarf(prob, x)
# Fairness-Constraints
ziel_dienste_global = self.berechne_faire_zielverteilung_global()
ziel_dienste_lokal = self.berechne_faire_zielverteilung_lokal()
# Observer Pattern: Notify ausgabe about target distributions
self.ausgabe.setze_zielverteilungen(ziel_dienste_lokal, ziel_dienste_global)
# F2: Lokale Fairness-Constraints
fairness_abweichung_lokal = self._add_fairness_constraints(
prob, x, ziel_dienste_lokal, "lokal"
)
# F1: Globale Fairness-Constraints
fairness_abweichung_global = self._add_fairness_constraints(
prob, x, ziel_dienste_global, "global"
)
# F3: Dienstübergreifende Fairness - Global
fairness_abweichung_gesamt_global = self._add_constraint_gesamtfairness(
prob, x, ziel_dienste_global, "global"
)
# F3: Dienstübergreifende Fairness - Lokal
fairness_abweichung_gesamt_lokal = self._add_constraint_gesamtfairness(
prob, x, ziel_dienste_lokal, "lokal"
)
# Zielfunktion erstellen
self._erstelle_zielfunktion(prob, x, fairness_abweichung_lokal, fairness_abweichung_global,
fairness_abweichung_gesamt_global, fairness_abweichung_gesamt_lokal)
print(f"Modell erstellt mit {len(x)} Variablen und {len(prob.constraints)} Constraints")
return prob, x
def löse_optimierung(self, prob: pulp.LpProblem,
x: Entscheidungsvariablen) -> Optional[Dict[date, Dict[Dienst, List[Eltern]]]]:
"""Löst das Optimierungsproblem"""
print("Löse Optimierungsproblem...")
# Solver wählen (verfügbare Solver testen)
solver = None
try:
print("Versuche CBC Solver...")
solver = pulp.PULP_CBC_CMD(msg=0, timeLimit=10) # Standard CBC Solver
except:
try:
print("Versuche GLPK Solver...")
solver = pulp.GLPK_CMD(msg=0) # GLPK falls verfügbar
except:
print("Kein spezifizierter Solver verfügbar, verwende Standard.")
solver = None # Default Solver
prob.solve(solver)
status = pulp.LpStatus[prob.status]
print(f"Optimierung abgeschlossen: {status}")
if prob.status != pulp.LpStatusOptimal:
print("WARNUNG: Keine optimale Lösung gefunden!")
return None
# Lösung extrahieren
lösung: Dict[date, Dict[Dienst, List[Eltern]]] = {}
for (eltern, tag, dienst), var in x.items():
if var.varValue and var.varValue > 0.5: # Binary variable ist 1
if tag not in lösung:
lösung[tag] = {}
if dienst not in lösung[tag]:
lösung[tag][dienst] = []
lösung[tag][dienst].append(eltern)
return lösung
def main() -> None:
if len(sys.argv) < 4:
print("Usage: ./elterndienstplaner.py <eingabe.csv> <eltern.csv> <ausgabe.csv> [<vorherige-ausgaben.csv>]")
sys.exit(1)
eingabe_datei = sys.argv[1]
eltern_datei = sys.argv[2]
ausgabe_datei = sys.argv[3]
vorherige_datei = sys.argv[4] if len(sys.argv) > 4 else None
print("Elterndienstplaner gestartet")
print("="*50)
try:
# Create data model and load data
daten = ElterndienstplanerDaten()
daten.lade_daten(eingabe_datei, eltern_datei, vorherige_datei)
# Create output handler and optimization engine
ausgabe = ElterndienstAusgabe(daten)
planer = Elterndienstplaner(daten, ausgabe)
# Optimierung
prob, x = planer.erstelle_optimierungsmodell()
lösung = planer.löse_optimierung(prob, x)
if lösung is not None:
# Ergebnisse ausgeben
ausgabe.schreibe_ausgabe_csv(ausgabe_datei, lösung)
ausgabe.drucke_statistiken(lösung)
# Visualisierung der Verteilungen (uses Observer Pattern targets)
ausgabe.visualisiere_verteilungen(lösung)
# Visualisierung der Präferenz-Verletzungen
ausgabe.visualisiere_praeferenz_verletzungen(lösung)
print("\n✓ Planung erfolgreich abgeschlossen!")
else:
print("\n✗ Fehler: Keine gültige Lösung gefunden!")
sys.exit(1)
except Exception as e:
print(f"\n✗ Fehler: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()