elterndienstplaner/elterndienstplaner.py

565 lines
24 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Elterndienstplaner - Optimale Zuteilung von Elterndiensten
Autor: Automatisch generiert
Datum: Dezember 2025
"""
import sys
import pulp
import multiprocessing
from datetime import timedelta, date
from collections import defaultdict
from typing import Dict, List, Tuple, DefaultDict, Optional
from datenmodell import ElterndienstplanerDaten, Dienst, Eltern, Zielverteilung, Entscheidungsvariablen
from ausgabe import ElterndienstAusgabe
class Elterndienstplaner:
"""Optimierungs-Engine fuer Elterndienstplanung"""
def __init__(self, daten: ElterndienstplanerDaten, ausgabe: ElterndienstAusgabe) -> None:
self.daten = daten
self.ausgabe = ausgabe
def berechne_faire_zielverteilung_global(self) -> Zielverteilung:
"""Berechnet die faire Zielanzahl von Diensten fuer den Planungszeitraum
basierend auf globaler Fairness (Historie + aktueller Planungszeitraum).
Gibt die Ziel-Dienstanzahl fuer den aktuellen Planungszeitraum zurueck,
korrigiert um bereits geleistete Dienste. Kann negativ sein, wenn bereits
mehr Dienste geleistet wurden als fair waere."""
ziel_dienste: Zielverteilung = defaultdict(lambda: defaultdict(float))
print("\nBerechne faire Zielverteilung basierend auf historischen Daten...")
historische_tage = set(datum for datum, _, _ in self.daten.historische_dienste) if self.daten.historische_dienste else set()
print(f" Analysiere {len(historische_tage)} historische Tage mit {len(self.daten.historische_dienste)} Diensten")
for dienst in self.daten.dienste:
print(f" Verarbeite Dienst {dienst.kuerzel}...")
# 1. HISTORISCHE PERIODE: Faire Umverteilung
historische_dienste_dieses_typs = [
(datum, eltern) for datum, eltern, d in self.daten.historische_dienste
if d == dienst
]
print(f" Gefundene historische {dienst.kuerzel}-Dienste: {len(historische_dienste_dieses_typs)}")
dienste_pro_tag = defaultdict(list)
for datum, eltern in historische_dienste_dieses_typs:
dienste_pro_tag[datum].append(eltern)
for tag, geleistete_eltern in dienste_pro_tag.items():
anzahl_dienste = len(geleistete_eltern)
gesamt_dienstfaktor_tag = 0
for eltern in self.daten.eltern:
gesamt_dienstfaktor_tag += self.daten.dienstfaktoren[eltern][tag]
if gesamt_dienstfaktor_tag > 0:
for eltern in self.daten.eltern:
if self.daten.dienstfaktoren[eltern][tag] > 0:
anteil = self.daten.dienstfaktoren[eltern][tag] / gesamt_dienstfaktor_tag
faire_zuteilung = anteil * anzahl_dienste
ziel_dienste[eltern][dienst] += faire_zuteilung
#if faire_zuteilung > 0.01:
# print(f" {tag}: {eltern} Faktor={self.daten.dienstfaktoren[eltern][tag]} "
# f"-> {faire_zuteilung:.2f} von {anzahl_dienste} Diensten")
# 2. AKTUELLER PLANUNGSZEITRAUM: Faire Verteilung
benoetigte_dienste_planungszeitraum = 0
for tag in self.daten.planungszeitraum:
if dienst not in self.daten.benoetigte_dienste.get(tag, []):
continue
benoetigte_dienste_planungszeitraum += dienst.personen_anzahl
dienstfaktoren = {}
gesamt_dienstfaktor_tag = 0
for eltern in self.daten.eltern:
faktor = self.daten.dienstfaktoren[eltern][tag]
dienstfaktoren[eltern] = faktor
gesamt_dienstfaktor_tag += faktor
if gesamt_dienstfaktor_tag > 0:
for eltern in self.daten.eltern:
anteil = dienstfaktoren[eltern] / gesamt_dienstfaktor_tag
faire_zuteilung = anteil * dienst.personen_anzahl
ziel_dienste[eltern][dienst] += faire_zuteilung
# 3. ABZUG DER BEREITS GELEISTETEN DIENSTE
for eltern in self.daten.eltern:
vorherige_anzahl = sum(
1 for _, hist_eltern, hist_dienst in self.daten.historische_dienste
if hist_eltern == eltern and hist_dienst == dienst
)
ziel_dienste[eltern][dienst] -= vorherige_anzahl
return ziel_dienste
def berechne_faire_zielverteilung_lokal(self) -> Zielverteilung:
"""Berechnet die lokale faire Zielanzahl von Diensten pro Eltern-Dienst-Kombination
basierend auf Dienstfaktoren und benoetigten Diensten im aktuellen Planungszeitraum
WICHTIG: Bei der lokalen Fairness werden Abwesenheitstage NICHT in die Dienstpflicht
eingerechnet (Dienstfaktor = 0 an Abwesenheitstagen). Das führt zu einer gleichmäßigeren
Verteilung im aktuellen Monat und verhindert, dass Familien mit längeren Abwesenheiten
in den wenigen verfügbaren Tagen überproportional viele Dienste bekommen.
Die "verpassten" Dienste werden dann über die globale Fairness (F1) im Jahresverlauf
ausgeglichen."""
ziel_dienste_lokal: Zielverteilung = defaultdict(lambda: defaultdict(float))
print("\nBerechne lokale faire Zielverteilung für aktuellen Planungszeitraum...")
print(" (Abwesenheitstage werden aus der Dienstpflicht herausgerechnet)")
summe_dienstfaktor_planungszeitraum_alle_eltern = sum(
sum(self.daten.dienstfaktoren[e][tag] for tag in self.daten.planungszeitraum)
for e in self.daten.eltern
)
if summe_dienstfaktor_planungszeitraum_alle_eltern == 0:
print(" WARNUNG: Gesamtdienstfaktor ist 0, keine lokale Zielverteilung möglich")
return ziel_dienste_lokal
for dienst in self.daten.dienste:
benoetigte_dienste_planungszeitraum = sum(
1 for tag in self.daten.planungszeitraum
if dienst in self.daten.benoetigte_dienste.get(tag, [])
)
benoetigte_dienste_planungszeitraum *= dienst.personen_anzahl
if benoetigte_dienste_planungszeitraum > 0:
print(f" {dienst.kuerzel}: {benoetigte_dienste_planungszeitraum} Dienste benötigt")
for eltern in self.daten.eltern:
summe_dienstfaktor_planungszeitraum = sum(
self.daten.dienstfaktoren[eltern][tag] for tag in self.daten.planungszeitraum
)
if summe_dienstfaktor_planungszeitraum > 0:
anteil = summe_dienstfaktor_planungszeitraum / summe_dienstfaktor_planungszeitraum_alle_eltern
faire_zuteilung = anteil * benoetigte_dienste_planungszeitraum
ziel_dienste_lokal[eltern][dienst] = faire_zuteilung
return ziel_dienste_lokal
def _erstelle_entscheidungsvariablen(self) -> Entscheidungsvariablen:
"""Erstellt die binaeren Entscheidungsvariablen x[eltern, tag, dienst]"""
x: Entscheidungsvariablen = {}
for eltern in self.daten.eltern:
for tag in self.daten.planungszeitraum:
for dienst in self.daten.dienste:
if dienst in self.daten.benoetigte_dienste.get(tag, []):
x[eltern, tag, dienst] = pulp.LpVariable(
f"x_{eltern.replace(' ', '_')}_{tag}_{dienst.kuerzel}",
cat='Binary'
)
return x
def _add_constraint_ein_dienst_pro_woche(
self,
prob: pulp.LpProblem,
x: Entscheidungsvariablen
) -> None:
"""C1: Je Eltern und Dienst nur einmal die Woche (Woche = Montag bis Sonntag)"""
erster_tag = self.daten.planungszeitraum[0]
# Finde Montag am oder vor dem ersten Planungstag
woche_start = erster_tag - timedelta(days=erster_tag.weekday())
woche_nr = 0
letzter_tag = self.daten.planungszeitraum[-1]
print ("\n Erster Tag im Planungszeitraum:", erster_tag)
print ("\n Letzter Tag im Planungszeitraum:", letzter_tag)
while woche_start <= letzter_tag:
woche_ende = woche_start + timedelta(days=6)
for eltern in self.daten.eltern:
for dienst in self.daten.dienste:
woche_vars = []
# Zaehle historische Dienste in dieser Woche (VOR Planungszeitraum)
historische_dienste_in_woche = 0
#if woche_start < erster_tag:
for hist_datum, hist_eltern, hist_dienst in self.daten.historische_dienste:
if (hist_eltern == eltern and
hist_dienst == dienst and
woche_start <= hist_datum and
hist_datum <= woche_ende):
historische_dienste_in_woche += 1
for tag in self.daten.planungszeitraum:
if woche_start <= tag <= woche_ende:
if (eltern, tag, dienst) in x:
woche_vars.append(x[eltern, tag, dienst])
if woche_vars:
if (1 - historische_dienste_in_woche) >= 0:
prob += pulp.lpSum(woche_vars) <= 1 - historische_dienste_in_woche, \
f"C1_{eltern.replace(' ', '_')}_{dienst.kuerzel}_w{woche_nr}"
else:
print (f" Hinweis: {eltern} hat in Woche {woche_nr} bereits {historische_dienste_in_woche} mal {dienst.name}")
woche_start += timedelta(days=7)
woche_nr += 1
def _add_constraint_ein_dienst_pro_tag(
self,
prob: pulp.LpProblem,
x: Entscheidungsvariablen
) -> None:
"""C2: Je Eltern nur einen Dienst am Tag"""
for eltern in self.daten.eltern:
for tag in self.daten.planungszeitraum:
tag_vars = []
maximum = 1
for dienst in self.daten.dienste:
if (eltern, tag, dienst) in x:
tag_vars.append(x[eltern, tag, dienst])
for hist_datum, hist_eltern, hist_dienst in self.daten.historische_dienste:
if (hist_eltern == eltern and
hist_datum == tag):
maximum = 0
if tag_vars:
prob += pulp.lpSum(tag_vars) <= maximum, f"C2_{eltern.replace(' ', '_')}_{tag}"
def _add_constraint_verfuegbarkeit(
self,
prob: pulp.LpProblem,
x: Entscheidungsvariablen
) -> None:
"""C3: Dienste nur verfuegbaren Eltern zuteilen"""
for eltern in self.daten.eltern:
for tag in self.daten.planungszeitraum:
if not self.daten.verfuegbarkeit.get((eltern, tag), True):
for dienst in self.daten.dienste:
if (eltern, tag, dienst) in x:
prob += x[eltern, tag, dienst] == 0, \
f"C3_{eltern.replace(' ', '_')}_{tag}_{dienst.kuerzel}"
def _add_constraint_dienst_bedarf(
self,
prob: pulp.LpProblem,
x: Entscheidungsvariablen
) -> None:
"""C4: Alle benoetigten Dienste muessen zugeteilt werden"""
for tag in self.daten.planungszeitraum:
for dienst in self.daten.benoetigte_dienste.get(tag, []):
dienst_vars = []
for eltern in self.daten.eltern:
if (eltern, tag, dienst) in x:
if self.daten.verfuegbarkeit.get((eltern, tag), True):
dienst_vars.append(x[eltern, tag, dienst])
if dienst_vars:
# Anzahl benoetigter Personen pro Dienst
benoetigte_personen = dienst.personen_anzahl
prob += pulp.lpSum(dienst_vars) == benoetigte_personen, \
f"Bedarf_{tag}_{dienst.kuerzel}"
def _add_constraint_fairness_diensttypspezifisch(
self,
prob: pulp.LpProblem,
x: Entscheidungsvariablen,
ziel_dienste: Zielverteilung,
constraint_prefix: str
) -> Dict:
"""F1/F2: Fairness pro Diensttyp - gleicht Anzahl je Diensttyp aus
Berechnet die Abweichung der zugeteilten Dienste vom fairen Ziel
fuer jeden Diensttyp separat. Dies sorgt dafuer, dass z.B. Kochdienste
und Putzdienste jeweils fair verteilt werden.
F1 (global): Basierend auf historischen Daten + aktuellem Planungszeitraum
F2 (lokal): Nur fuer den aktuellen Planungszeitraum
Args:
prob: Das LP-Problem
x: Die Entscheidungsvariablen
ziel_dienste: Die Zielverteilung (global oder lokal)
constraint_prefix: Praefix fuer Constraint-Namen ('global' fuer F1, 'lokal' fuer F2)
Returns:
Dictionary mit Fairness-Abweichungsvariablen pro Diensttyp
"""
fairness_abweichung = {}
for eltern in self.daten.eltern:
for dienst in self.daten.dienste:
fairness_abweichung[eltern, dienst] = pulp.LpVariable(
f"fair_{constraint_prefix}_{eltern.replace(' ', '_')}_{dienst.kuerzel}",
lowBound=0)
for eltern in self.daten.eltern:
for dienst in self.daten.dienste:
zugeteilte_dienste_planungszeitraum = pulp.lpSum(
x[eltern, tag, dienst]
for tag in self.daten.planungszeitraum
if (eltern, tag, dienst) in x
)
ziel = ziel_dienste[eltern][dienst]
prob += (zugeteilte_dienste_planungszeitraum - ziel <=
fairness_abweichung[eltern, dienst])
prob += (ziel - zugeteilte_dienste_planungszeitraum <=
fairness_abweichung[eltern, dienst])
return fairness_abweichung
def _add_constraint_fairness_typuebergreifend(
self,
prob: pulp.LpProblem,
x: Entscheidungsvariablen,
ziel_dienste: Zielverteilung,
constraint_prefix: str
) -> Dict:
"""F3/F4: Diensttypuebergreifende Fairness - verhindert Haeufung bei einzelnen Familien
Berechnet die Abweichung der Gesamtdienstanzahl (ueber alle Diensttypen)
vom fairen Gesamtziel. Dies verhindert, dass einzelne Familien ueber alle
Diensttypen hinweg ueberproportional viele Dienste bekommen.
F3 (global): Basierend auf historischen Daten + aktuellem Planungszeitraum
F4 (lokal): Nur fuer den aktuellen Planungszeitraum
Args:
prob: Das LP-Problem
x: Die Entscheidungsvariablen
ziel_dienste: Die Zielverteilung (global oder lokal)
constraint_prefix: Praefix fuer Constraint-Namen ('global' fuer F3, 'lokal' fuer F4)
Returns:
Dictionary mit Gesamt-Fairness-Abweichungsvariablen ueber alle Diensttypen
"""
fairness_abweichung_gesamt = {}
for eltern in self.daten.eltern:
fairness_abweichung_gesamt[eltern] = pulp.LpVariable(
f"fair_gesamt_{constraint_prefix}_{eltern.replace(' ', '_')}",
lowBound=0)
# Zähle tatsächliche Dienste gewichtet mit dem Aufwand des Dienstes
tatsaechliche_dienste_gesamt = pulp.lpSum(
dienst.aufwand * x[eltern, tag, dienst]
for tag in self.daten.planungszeitraum
for dienst in self.daten.dienste
if (eltern, tag, dienst) in x
)
# Zielgesamt ebenfalls mit Dienst-Aufwand gewichtet
ziel_gesamt = sum(ziel_dienste[eltern][dienst] * dienst.aufwand for dienst in self.daten.dienste)
prob += (tatsaechliche_dienste_gesamt - ziel_gesamt <=
fairness_abweichung_gesamt[eltern])
prob += (ziel_gesamt - tatsaechliche_dienste_gesamt <=
fairness_abweichung_gesamt[eltern])
return fairness_abweichung_gesamt
def _erstelle_zielfunktion(
self,
prob: pulp.LpProblem,
x: Entscheidungsvariablen,
fairness_abweichung_lokal: Dict,
fairness_abweichung_global: Dict,
fairness_abweichung_gesamt_global: Dict,
fairness_abweichung_gesamt_lokal: Dict
) -> None:
"""Erstellt die Zielfunktion mit Fairness und Praeferenzen"""
objective_terms = []
gewicht_global = 40
gewicht_lokal = 60
gewicht_f1 = gewicht_global
gewicht_f2 = gewicht_lokal
gewicht_f3_global = 0.25 * gewicht_global
gewicht_f4_lokal = 0.25 * gewicht_lokal
for eltern in self.daten.eltern:
for dienst in self.daten.dienste:
# Skaliere diensttyp-spezifische Fairness mit dem Aufwand des Dienstes
objective_terms.append(gewicht_f1 * fairness_abweichung_global[eltern, dienst] * dienst.aufwand)
objective_terms.append(gewicht_f2 * fairness_abweichung_lokal[eltern, dienst] * dienst.aufwand)
# Gesamt-Fairness (bereits dienstabhängig in den Constraints) — keine zusätzliche Mean-Skalierung mehr
objective_terms.append(gewicht_f3_global * fairness_abweichung_gesamt_global[eltern])
objective_terms.append(gewicht_f4_lokal * fairness_abweichung_gesamt_lokal[eltern])
# P1: Bevorzugte Dienste (stärker für aufwändigere Dienste)
for (eltern, tag, dienst), praef in self.daten.praeferenzen.items():
if (eltern, tag, dienst) in x and praef == 1:
objective_terms.append(-10 * dienst.aufwand * x[eltern, tag, dienst])
# P2: Abgelehnte Dienste (stärker für aufwändigere Dienste)
for (eltern, tag, dienst), praef in self.daten.praeferenzen.items():
if (eltern, tag, dienst) in x and praef == -1:
objective_terms.append(20 * dienst.aufwand * x[eltern, tag, dienst])
if objective_terms:
prob += pulp.lpSum(objective_terms)
else:
prob += pulp.lpSum([var for var in x.values()])
print(f"Verwende Gewichtung: F1 (global) = {gewicht_f1}, F2 (lokal) = {gewicht_f2}, "
f"F3 (global) = {gewicht_f3_global}, F4 (lokal) = {gewicht_f4_lokal}")
def erstelle_optimierungsmodell(self) -> Tuple[
pulp.LpProblem,
Entscheidungsvariablen
]:
"""Erstellt das PuLP Optimierungsmodell
Returns:
Tuple mit (prob, x, ziel_dienste_lokal, ziel_dienste_global)
"""
print("Erstelle Optimierungsmodell...")
print("\nDebug: Verfuegbarkeit analysieren...")
for tag in self.daten.planungszeitraum[:5]:
verfuegbare = [e for e in self.daten.eltern if self.daten.verfuegbarkeit.get((e, tag), True)]
benoetigte = self.daten.benoetigte_dienste.get(tag, [])
print(f" {tag}: Benötigt {len(benoetigte)} Dienste {benoetigte}, verfügbar: {verfuegbare}")
prob = pulp.LpProblem("Elterndienstplaner", pulp.LpMinimize)
x = self._erstelle_entscheidungsvariablen()
self._add_constraint_ein_dienst_pro_woche(prob, x)
self._add_constraint_ein_dienst_pro_tag(prob, x)
self._add_constraint_verfuegbarkeit(prob, x)
self._add_constraint_dienst_bedarf(prob, x)
ziel_dienste_global = self.berechne_faire_zielverteilung_global()
ziel_dienste_lokal = self.berechne_faire_zielverteilung_lokal()
self.ausgabe.setze_zielverteilungen(ziel_dienste_lokal, ziel_dienste_global)
# F2: Lokale Fairness pro Diensttyp
fairness_abweichung_lokal = self._add_constraint_fairness_diensttypspezifisch(
prob, x, ziel_dienste_lokal, "lokal"
)
# F1: Globale Fairness pro Diensttyp
fairness_abweichung_global = self._add_constraint_fairness_diensttypspezifisch(
prob, x, ziel_dienste_global, "global"
)
# F3: Diensttypuebergreifende Fairness (global)
fairness_abweichung_gesamt_global = self._add_constraint_fairness_typuebergreifend(
prob, x, ziel_dienste_global, "global"
)
# F4: Diensttypuebergreifende Fairness (lokal)
fairness_abweichung_gesamt_lokal = self._add_constraint_fairness_typuebergreifend(
prob, x, ziel_dienste_lokal, "lokal"
)
self._erstelle_zielfunktion(prob, x, fairness_abweichung_lokal, fairness_abweichung_global,
fairness_abweichung_gesamt_global, fairness_abweichung_gesamt_lokal)
print(f"Modell erstellt mit {len(x)} Variablen und {len(prob.constraints)} Constraints")
return prob, x
def loese_optimierung(self, prob: pulp.LpProblem,
x: Entscheidungsvariablen) -> Optional[Dict[date, Dict[Dienst, List[Eltern]]]]:
"""Loest das Optimierungsproblem"""
print("Löse Optimierungsproblem...")
solver = None
try:
cpu_count = multiprocessing.cpu_count()
threads = max(1, cpu_count - 1)
print(f"Versuche CBC Solver mit {threads} Threads...")
solver = pulp.PULP_CBC_CMD(msg=0, timeLimit=20, threads=threads)
except Exception:
try:
print("Versuche GLPK Solver...")
solver = pulp.GLPK_CMD(msg=0)
except Exception:
print("Kein spezifizierter Solver verfügbar, verwende Standard.")
solver = None
prob.solve(solver)
status = pulp.LpStatus[prob.status]
print(f"Optimierung abgeschlossen: {status}")
if prob.status != pulp.LpStatusOptimal:
print("WARNUNG: Keine optimale Lösung gefunden!")
return None
loesung: Dict[date, Dict[Dienst, List[Eltern]]] = {}
for (eltern, tag, dienst), var in x.items():
if var.varValue and var.varValue > 0.5:
if tag not in loesung:
loesung[tag] = {}
if dienst not in loesung[tag]:
loesung[tag][dienst] = []
loesung[tag][dienst].append(eltern)
return loesung
def main() -> None:
if len(sys.argv) < 4:
print("Usage: ./elterndienstplaner.py <eingabe.csv> <eltern.csv> <ausgabe.csv> [<vorherige-ausgaben.csv>]")
sys.exit(1)
eingabe_datei = sys.argv[1]
eltern_datei = sys.argv[2]
ausgabe_datei = sys.argv[3]
vorherige_datei = sys.argv[4] if len(sys.argv) > 4 else None
print("Elterndienstplaner gestartet")
print("="*50)
try:
daten = ElterndienstplanerDaten()
daten.lade_daten(eingabe_datei, eltern_datei, vorherige_datei)
ausgabe = ElterndienstAusgabe(daten)
planer = Elterndienstplaner(daten, ausgabe)
prob, x = planer.erstelle_optimierungsmodell()
loesung = planer.loese_optimierung(prob, x)
if loesung is not None:
ausgabe.schreibe_ausgabe_csv(ausgabe_datei, loesung)
ausgabe.drucke_statistiken(loesung)
ausgabe.visualisiere_dienste_uebersicht(loesung)
ausgabe.visualisiere_verteilungen(loesung)
ausgabe.visualisiere_praeferenz_verletzungen(loesung)
print("\n✓ Planung erfolgreich abgeschlossen!")
else:
print("\n✗ Fehler: Keine gültige Lösung gefunden!")
sys.exit(1)
except Exception as e:
print(f"\n✗ Fehler: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()