690 lines
30 KiB
Python
Executable File
690 lines
30 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Elterndienstplaner - Optimale Zuteilung von Elterndiensten
|
|
|
|
Autor: Automatisch generiert
|
|
Datum: Dezember 2025
|
|
"""
|
|
|
|
import sys
|
|
import csv
|
|
import pulp
|
|
from datetime import datetime, timedelta, date
|
|
from collections import defaultdict
|
|
from typing import Dict, List, Tuple, DefaultDict, Optional
|
|
import calendar
|
|
|
|
|
|
class Dienst:
|
|
"""Repräsentiert einen Diensttyp mit allen seinen Eigenschaften"""
|
|
|
|
def __init__(self, kuerzel: str, name: str, personen_anzahl: int = 1) -> None:
|
|
self.kuerzel: str = kuerzel
|
|
self.name: str = name
|
|
self.personen_anzahl: int = personen_anzahl
|
|
|
|
def __str__(self) -> str:
|
|
return f"{self.kuerzel} ({self.name}): {self.personen_anzahl} Person(en)"
|
|
|
|
def __repr__(self) -> str:
|
|
return f"Dienst('{self.kuerzel}', '{self.name}', {self.personen_anzahl})"
|
|
|
|
def braucht_mehrere_personen(self) -> bool:
|
|
"""Gibt True zurück, wenn mehr als eine Person benötigt wird"""
|
|
return self.personen_anzahl > 1
|
|
|
|
|
|
class Elterndienstplaner:
|
|
def __init__(self) -> None:
|
|
# Dienste als Liste definieren
|
|
self.dienste: List[Dienst] = [
|
|
Dienst('F', 'Frühstücksdienst', 1),
|
|
Dienst('P', 'Putznotdienst', 1),
|
|
Dienst('E', 'Essensausgabenotdienst', 1),
|
|
Dienst('K', 'Kochen', 1),
|
|
Dienst('A', 'Elternabend', 2)
|
|
]
|
|
|
|
# Datenstrukturen
|
|
self.tage: List[date] = []
|
|
self.eltern: List[str] = []
|
|
self.benoetigte_dienste: Dict[date, List[Dienst]] = {}
|
|
self.verfügbarkeit: Dict[Tuple[str, date], bool] = {}
|
|
self.präferenzen: Dict[Tuple[str, date, Dienst], int] = {}
|
|
self.dienstfaktoren: Dict[str, Dict[date, float]] = {}
|
|
self.alle_zeitraeume: Dict[str, List[Tuple[date, date, float]]] = {}
|
|
self.vorherige_dienste: DefaultDict[str, DefaultDict[Dienst, int]] = \
|
|
defaultdict(lambda: defaultdict(int))
|
|
self.historische_dienste: List[Tuple[date, str, Dienst]] = []
|
|
|
|
def get_dienst(self, kuerzel: str) -> Optional[Dienst]:
|
|
"""Gibt das Dienst-Objekt für ein Kürzel zurück"""
|
|
for dienst in self.dienste:
|
|
if dienst.kuerzel == kuerzel:
|
|
return dienst
|
|
return None
|
|
|
|
def add_dienst(self, kuerzel: str, name: str, personen_anzahl: int = 1) -> Dienst:
|
|
"""Fügt einen neuen Dienst hinzu"""
|
|
dienst = Dienst(kuerzel, name, personen_anzahl)
|
|
self.dienste.append(dienst)
|
|
return dienst
|
|
|
|
def print_dienste_info(self) -> None:
|
|
"""Druckt Informationen über alle konfigurierten Dienste"""
|
|
print("Konfigurierte Dienste:")
|
|
for dienst in self.dienste:
|
|
print(f" {dienst}")
|
|
|
|
def lade_eingabe_csv(self, datei: str) -> None:
|
|
"""Lädt die eingabe.csv mit Terminen und Präferenzen"""
|
|
print(f"Lade Eingabedaten aus {datei}...")
|
|
|
|
with open(datei, 'r', encoding='utf-8') as f:
|
|
reader = csv.reader(f)
|
|
header = next(reader)
|
|
|
|
# Eltern aus Header extrahieren (ab Spalte 3)
|
|
self.eltern = [name.strip() for name in header[3:] if name.strip()]
|
|
print(f"Gefundene Eltern: {self.eltern}")
|
|
|
|
for row in reader:
|
|
if len(row) < 3:
|
|
continue
|
|
|
|
datum = row[0].strip()
|
|
wochentag = row[1].strip()
|
|
dienste_str = row[2].strip()
|
|
|
|
# Datum parsen
|
|
try:
|
|
datum_obj = datetime.strptime(datum, '%Y-%m-%d').date()
|
|
self.tage.append(datum_obj)
|
|
except ValueError:
|
|
print(f"Warnung: Ungültiges Datum {datum}")
|
|
continue
|
|
|
|
# Benötigte Dienste
|
|
dienst_objekte = []
|
|
for kuerzel in dienste_str:
|
|
dienst = self.get_dienst(kuerzel)
|
|
if dienst:
|
|
dienst_objekte.append(dienst)
|
|
self.benoetigte_dienste[datum_obj] = dienst_objekte
|
|
|
|
# Verfügbarkeit und Präferenzen der Eltern
|
|
for i, eltern_name in enumerate(self.eltern):
|
|
if i + 3 < len(row):
|
|
präf_str = row[i + 3].strip()
|
|
|
|
# Verfügbarkeit prüfen
|
|
if präf_str == 'x':
|
|
self.verfügbarkeit[(eltern_name, datum_obj)] = False
|
|
else:
|
|
self.verfügbarkeit[(eltern_name, datum_obj)] = True
|
|
|
|
# Präferenzen parsen
|
|
self._parse_präferenzen(eltern_name, datum_obj, präf_str)
|
|
else:
|
|
# Standard: verfügbar, keine Präferenzen
|
|
self.verfügbarkeit[(eltern_name, datum_obj)] = True
|
|
|
|
self.tage.sort()
|
|
print(f"Zeitraum: {self.tage[0]} bis {self.tage[-1]} ({len(self.tage)} Tage)")
|
|
|
|
def _parse_präferenzen(self, eltern: str, datum: date, präf_str: str) -> None:
|
|
"""Parst Präferenzstring wie 'F+P-E+' """
|
|
i = 0
|
|
while i < len(präf_str):
|
|
if i + 1 < len(präf_str):
|
|
dienst = self.get_dienst(präf_str[i])
|
|
if dienst and i + 1 < len(präf_str):
|
|
if präf_str[i + 1] == '+':
|
|
self.präferenzen[(eltern, datum, dienst)] = 1
|
|
i += 2
|
|
elif präf_str[i + 1] == '-':
|
|
self.präferenzen[(eltern, datum, dienst)] = -1
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
else:
|
|
i += 1
|
|
else:
|
|
i += 1
|
|
|
|
def lade_eltern_csv(self, datei: str) -> None:
|
|
"""Lädt die eltern.csv mit Dienstfaktoren"""
|
|
print(f"Lade Elterndaten aus {datei}...")
|
|
|
|
with open(datei, 'r', encoding='utf-8') as f:
|
|
reader = csv.reader(f)
|
|
header = next(reader)
|
|
|
|
for row in reader:
|
|
if len(row) < 4:
|
|
continue
|
|
|
|
eltern_name = row[0].strip()
|
|
|
|
# Initialisiere Datenstrukturen
|
|
self.dienstfaktoren[eltern_name] = {}
|
|
self.alle_zeitraeume[eltern_name] = []
|
|
|
|
# Alle Zeiträume einlesen (jeweils 3 Spalten: Beginn, Ende, Faktor)
|
|
for i in range(1, len(row), 3):
|
|
if i + 2 < len(row) and row[i].strip() and row[i + 1].strip():
|
|
try:
|
|
beginn = datetime.strptime(row[i].strip(), '%Y-%m-%d').date()
|
|
ende = datetime.strptime(row[i + 1].strip(), '%Y-%m-%d').date()
|
|
faktor = float(row[i + 2].strip()) if row[i + 2].strip() else 0
|
|
|
|
# Zeitraum speichern
|
|
self.alle_zeitraeume[eltern_name].append((beginn, ende, faktor))
|
|
|
|
# Faktor für Tage im aktuellen Planungsmonat setzen
|
|
for tag in self.tage:
|
|
if beginn <= tag <= ende:
|
|
self.dienstfaktoren[eltern_name][tag] = faktor
|
|
|
|
except (ValueError, IndexError):
|
|
continue
|
|
|
|
# Tage ohne expliziten Faktor auf 0 setzen
|
|
for tag in self.tage:
|
|
if tag not in self.dienstfaktoren[eltern_name]:
|
|
self.dienstfaktoren[eltern_name][tag] = 0
|
|
|
|
print(f"Dienstfaktoren geladen für {len(self.dienstfaktoren)} Eltern")
|
|
print(f"Zeiträume gespeichert für globale Fairness-Berechnung")
|
|
|
|
def lade_vorherige_ausgaben_csv(self, datei: str) -> None:
|
|
"""Lädt vorherige-ausgaben.csv für Fairness-Constraints"""
|
|
print(f"Lade vorherige Ausgaben aus {datei}...")
|
|
|
|
try:
|
|
with open(datei, 'r', encoding='utf-8') as f:
|
|
reader = csv.reader(f)
|
|
header = next(reader)
|
|
|
|
# Dienst-Spalten finden
|
|
dienst_spalten = {}
|
|
for i, col_name in enumerate(header[2:], 2): # Ab Spalte 2 (nach Datum, Wochentag)
|
|
for dienst in self.dienste:
|
|
if dienst.name.lower() in col_name.lower() or dienst.kuerzel == col_name:
|
|
dienst_spalten[dienst] = i
|
|
break
|
|
|
|
for row in reader:
|
|
if len(row) < 3:
|
|
continue
|
|
|
|
# Datum parsen
|
|
try:
|
|
datum = datetime.strptime(row[0].strip(), '%Y-%m-%d').date()
|
|
except ValueError:
|
|
continue
|
|
|
|
# Zugeteilte Dienste zählen UND mit Datum speichern
|
|
for dienst, spalte_idx in dienst_spalten.items():
|
|
if spalte_idx < len(row) and row[spalte_idx].strip():
|
|
# Mehrere Eltern können in einer Zelle stehen (durch Leerzeichen getrennt)
|
|
eltern_liste = row[spalte_idx].strip().split()
|
|
for eltern_name in eltern_liste:
|
|
if eltern_name in self.eltern:
|
|
# Summierung für Kompatibilität
|
|
self.vorherige_dienste[eltern_name][dienst] += 1
|
|
# Historische Dienste mit Datum speichern
|
|
self.historische_dienste.append((datum, eltern_name, dienst))
|
|
|
|
except FileNotFoundError:
|
|
print("Keine vorherigen Ausgaben gefunden - starte ohne historische Daten")
|
|
|
|
print(f"Vorherige Dienste geladen: {dict(self.vorherige_dienste)}")
|
|
print(f"Historische Dienste mit Datum: {len(self.historische_dienste)} Einträge")
|
|
|
|
def berechne_dienstfaktor_an_datum(self, eltern: str, datum: date) -> float:
|
|
"""Berechnet den Dienstfaktor eines Elternteils an einem bestimmten Datum"""
|
|
if eltern not in self.alle_zeitraeume:
|
|
return 0
|
|
|
|
for beginn, ende, faktor in self.alle_zeitraeume[eltern]:
|
|
if beginn <= datum <= ende:
|
|
return faktor
|
|
return 0
|
|
|
|
def berechne_faire_zielverteilung(self) -> DefaultDict[str, DefaultDict[Dienst, float]]:
|
|
"""Berechnet die faire Zielanzahl von Diensten pro Eltern-Dienst-Kombination
|
|
basierend auf tatsächlich geleisteten historischen Diensten und deren fairer Umverteilung"""
|
|
|
|
ziel_dienste: DefaultDict[str, DefaultDict[Dienst, float]] = \
|
|
defaultdict(lambda: defaultdict(float))
|
|
|
|
print("\nBerechne faire Zielverteilung basierend auf historischen Daten...")
|
|
|
|
# Historische Dienste nach Datum gruppieren
|
|
historische_tage = set(datum for datum, _, _ in self.historische_dienste) if self.historische_dienste else set()
|
|
print(f" Analysiere {len(historische_tage)} historische Tage mit {len(self.historische_dienste)} Diensten")
|
|
|
|
for dienst in self.dienste:
|
|
print(f" Verarbeite Dienst {dienst.kuerzel}...")
|
|
|
|
# 1. HISTORISCHE PERIODE: Faire Umverteilung der tatsächlich geleisteten Dienste
|
|
historische_dienste_dieses_typs = [
|
|
(datum, eltern) for datum, eltern, d in self.historische_dienste
|
|
if d == dienst
|
|
]
|
|
|
|
print(f" Gefundene historische {dienst.kuerzel}-Dienste: {len(historische_dienste_dieses_typs)}")
|
|
|
|
# Gruppiere nach Datum
|
|
dienste_pro_tag = defaultdict(list)
|
|
for datum, eltern in historische_dienste_dieses_typs:
|
|
dienste_pro_tag[datum].append(eltern)
|
|
|
|
# Für jeden historischen Tag faire Umverteilung berechnen
|
|
for hist_datum, geleistete_eltern in dienste_pro_tag.items():
|
|
anzahl_dienste = len(geleistete_eltern) # Anzahl Dienste an diesem Tag
|
|
|
|
# Dienstfaktoren aller Eltern für diesen historischen Tag berechnen
|
|
dienstfaktoren_tag = {}
|
|
gesamt_dienstfaktor_tag = 0
|
|
|
|
for eltern in self.eltern:
|
|
faktor = self.berechne_dienstfaktor_an_datum(eltern, hist_datum)
|
|
dienstfaktoren_tag[eltern] = faktor
|
|
gesamt_dienstfaktor_tag += faktor
|
|
|
|
# Faire Umverteilung der an diesem Tag geleisteten Dienste
|
|
if gesamt_dienstfaktor_tag > 0:
|
|
for eltern in self.eltern:
|
|
if dienstfaktoren_tag[eltern] > 0:
|
|
anteil = dienstfaktoren_tag[eltern] / gesamt_dienstfaktor_tag
|
|
faire_zuteilung = anteil * anzahl_dienste
|
|
ziel_dienste[eltern][dienst] += faire_zuteilung
|
|
|
|
if faire_zuteilung > 0.01: # Debug nur für relevante Werte
|
|
print(f" {hist_datum}: {eltern} Faktor={dienstfaktoren_tag[eltern]} "
|
|
f"-> {faire_zuteilung:.2f} von {anzahl_dienste} Diensten")
|
|
|
|
# 2. AKTUELLER MONAT: Faire Verteilung der benötigten Dienste
|
|
benoetigte_dienste_monat = sum(
|
|
1 for tag in self.tage
|
|
if dienst in self.benoetigte_dienste.get(tag, [])
|
|
)
|
|
|
|
# Multipliziere mit Anzahl benötigter Personen pro Dienst
|
|
benoetigte_dienste_monat *= dienst.personen_anzahl
|
|
|
|
if benoetigte_dienste_monat > 0:
|
|
# Gesamtdienstfaktor für aktuellen Monat
|
|
gesamt_dienstfaktor_monat = sum(
|
|
sum(self.dienstfaktoren.get(e, {}).get(tag, 0) for tag in self.tage)
|
|
for e in self.eltern
|
|
)
|
|
|
|
if gesamt_dienstfaktor_monat > 0:
|
|
for eltern in self.eltern:
|
|
monatsfaktor = sum(
|
|
self.dienstfaktoren.get(eltern, {}).get(tag, 0)
|
|
for tag in self.tage
|
|
)
|
|
if monatsfaktor > 0:
|
|
anteil = monatsfaktor / gesamt_dienstfaktor_monat
|
|
faire_zuteilung = anteil * benoetigte_dienste_monat
|
|
ziel_dienste[eltern][dienst] += faire_zuteilung
|
|
|
|
# Debug-Ausgabe für diesen Dienst
|
|
total_historisch = sum(
|
|
ziel_dienste[e][dienst] - (
|
|
sum(self.dienstfaktoren.get(e, {}).get(tag, 0) for tag in self.tage) /
|
|
sum(sum(self.dienstfaktoren.get(e2, {}).get(tag, 0) for tag in self.tage) for e2 in self.eltern) * benoetigte_dienste_monat
|
|
if sum(sum(self.dienstfaktoren.get(e2, {}).get(tag, 0) for tag in self.tage) for e2 in self.eltern) > 0 else 0
|
|
) for e in self.eltern
|
|
) if len(historische_dienste_dieses_typs) > 0 else 0
|
|
|
|
print(f" {dienst.kuerzel}: Historisch faire Summe={total_historisch:.1f}, "
|
|
f"Aktuell benötigt={benoetigte_dienste_monat}")
|
|
|
|
# Debug-Output: Detaillierte Zielverteilung
|
|
print("\n Berechnete Zielverteilung (basierend auf tatsächlichen historischen Diensten):")
|
|
for eltern in sorted(self.eltern):
|
|
for dienst in self.dienste:
|
|
if ziel_dienste[eltern][dienst] > 0.1: # Nur relevante Werte
|
|
ist = self.vorherige_dienste[eltern][dienst]
|
|
ziel = ziel_dienste[eltern][dienst]
|
|
print(f" {eltern} {dienst.kuerzel}: IST={ist}, FAIRE_ZIEL={ziel:.2f}, DIFF={ziel-ist:.2f}")
|
|
|
|
return ziel_dienste
|
|
|
|
def erstelle_optimierungsmodell(self) -> Tuple[pulp.LpProblem, Dict[Tuple[str, date, Dienst], pulp.LpVariable]]:
|
|
"""Erstellt das PuLP Optimierungsmodell"""
|
|
print("Erstelle Optimierungsmodell...")
|
|
|
|
# Debugging: Verfügbarkeit prüfen
|
|
print("\nDebug: Verfügbarkeit analysieren...")
|
|
for tag in self.tage[:5]: # Erste 5 Tage
|
|
verfügbare = [e for e in self.eltern if self.verfügbarkeit.get((e, tag), True)]
|
|
benötigte = self.benoetigte_dienste.get(tag, [])
|
|
print(f" {tag}: Benötigt {len(benötigte)} Dienste {benötigte}, verfügbar: {verfügbare}")
|
|
|
|
# LP Problem erstellen
|
|
prob = pulp.LpProblem("Elterndienstplaner", pulp.LpMinimize)
|
|
|
|
# Entscheidungsvariablen: x[eltern, tag, dienst] ∈ {0,1}
|
|
x: Dict[Tuple[str, date, Dienst], pulp.LpVariable] = {}
|
|
for eltern in self.eltern:
|
|
for tag in self.tage:
|
|
for dienst in self.dienste:
|
|
if dienst in self.benoetigte_dienste.get(tag, []):
|
|
x[eltern, tag, dienst] = pulp.LpVariable(
|
|
f"x_{eltern.replace(' ', '_')}_{tag}_{dienst.kuerzel}",
|
|
cat='Binary'
|
|
)
|
|
|
|
# Vereinfachtes Modell: Grundlegende Constraints
|
|
|
|
# C1: Je Eltern und Dienst nur einmal die Woche
|
|
woche_start = self.tage[0]
|
|
woche_nr = 0
|
|
while woche_start <= self.tage[-1]:
|
|
woche_ende = min(woche_start + timedelta(days=6), self.tage[-1])
|
|
woche_tage = [t for t in self.tage if woche_start <= t <= woche_ende]
|
|
|
|
for eltern in self.eltern:
|
|
for dienst in self.dienste:
|
|
woche_vars = []
|
|
for tag in woche_tage:
|
|
if (eltern, tag, dienst) in x:
|
|
woche_vars.append(x[eltern, tag, dienst])
|
|
|
|
if woche_vars:
|
|
prob += pulp.lpSum(woche_vars) <= 1, f"C1_{eltern.replace(' ', '_')}_{dienst.kuerzel}_w{woche_nr}"
|
|
|
|
woche_start += timedelta(days=7)
|
|
woche_nr += 1
|
|
|
|
# C2: Je Eltern nur einen Dienst am Tag
|
|
for eltern in self.eltern:
|
|
for tag in self.tage:
|
|
tag_vars = []
|
|
for dienst in self.dienste:
|
|
if (eltern, tag, dienst) in x:
|
|
tag_vars.append(x[eltern, tag, dienst])
|
|
|
|
if tag_vars:
|
|
prob += pulp.lpSum(tag_vars) <= 1, f"C2_{eltern.replace(' ', '_')}_{tag}"
|
|
|
|
# C3: Dienste nur verfügbaren Eltern zuteilen
|
|
for eltern in self.eltern:
|
|
for tag in self.tage:
|
|
if not self.verfügbarkeit.get((eltern, tag), True):
|
|
for dienst in self.dienste:
|
|
if (eltern, tag, dienst) in x:
|
|
prob += x[eltern, tag, dienst] == 0, f"C3_{eltern.replace(' ', '_')}_{tag}_{dienst.kuerzel}"
|
|
|
|
# Alle benötigten Dienste müssen zugeteilt werden (flexibel)
|
|
for tag in self.tage:
|
|
for dienst in self.benoetigte_dienste.get(tag, []):
|
|
dienst_vars = []
|
|
verfuegbare_eltern = 0
|
|
for eltern in self.eltern:
|
|
if (eltern, tag, dienst) in x:
|
|
# Prüfe ob Eltern verfügbar
|
|
if self.verfügbarkeit.get((eltern, tag), True):
|
|
dienst_vars.append(x[eltern, tag, dienst])
|
|
verfuegbare_eltern += 1
|
|
|
|
if dienst_vars:
|
|
# Anzahl benötigter Personen pro Dienst (aus Dienst-Objekt)
|
|
benoetigte_personen = dienst.personen_anzahl
|
|
prob += pulp.lpSum(dienst_vars) == benoetigte_personen, f"Bedarf_{tag}_{dienst.kuerzel}"
|
|
|
|
# FAIRNESS-CONSTRAINTS UND ZIELFUNKTION
|
|
objective_terms = []
|
|
|
|
# Berechne faire Zielverteilung
|
|
ziel_dienste = self.berechne_faire_zielverteilung()
|
|
|
|
# Hilfsvariablen für Fairness-Abweichungen
|
|
fairness_abweichung_lokal = {} # F2
|
|
fairness_abweichung_global = {} # F1
|
|
|
|
for eltern in self.eltern:
|
|
for dienst in self.dienste:
|
|
fairness_abweichung_lokal[eltern, dienst] = pulp.LpVariable(
|
|
f"fair_lokal_{eltern.replace(' ', '_')}_{dienst.kuerzel}", lowBound=0)
|
|
fairness_abweichung_global[eltern, dienst] = pulp.LpVariable(
|
|
f"fair_global_{eltern.replace(' ', '_')}_{dienst.kuerzel}", lowBound=0)
|
|
|
|
# F1: Globale Fairness & F2: Lokale Fairness
|
|
for eltern in self.eltern:
|
|
for dienst in self.dienste:
|
|
# Dienstfaktor für aktuellen Monat
|
|
monatlicher_dienstfaktor = sum(
|
|
self.dienstfaktoren.get(eltern, {}).get(tag, 0) for tag in self.tage
|
|
)
|
|
|
|
if monatlicher_dienstfaktor > 0:
|
|
# Tatsächliche Dienste im aktuellen Monat
|
|
tatsaechliche_dienste_monat = pulp.lpSum(
|
|
x[eltern, tag, dienst]
|
|
for tag in self.tage
|
|
if (eltern, tag, dienst) in x
|
|
)
|
|
|
|
# F2: Lokale Fairness - nur aktueller Monat
|
|
benoetigte_dienste_monat = sum(
|
|
1 for tag in self.tage
|
|
if dienst in self.benoetigte_dienste.get(tag, [])
|
|
)
|
|
# Multipliziere mit Anzahl benötigter Personen pro Dienst
|
|
benoetigte_dienste_monat *= dienst.personen_anzahl
|
|
|
|
gesamt_dienstfaktor_monat = sum(
|
|
sum(self.dienstfaktoren.get(e, {}).get(tag, 0) for tag in self.tage)
|
|
for e in self.eltern
|
|
)
|
|
|
|
if gesamt_dienstfaktor_monat > 0 and benoetigte_dienste_monat > 0:
|
|
erwartete_dienste_lokal = (
|
|
monatlicher_dienstfaktor / gesamt_dienstfaktor_monat
|
|
) * benoetigte_dienste_monat
|
|
|
|
# F2: Lokale Fairness-Constraints
|
|
prob += (tatsaechliche_dienste_monat - erwartete_dienste_lokal <=
|
|
fairness_abweichung_lokal[eltern, dienst])
|
|
prob += (erwartete_dienste_lokal - tatsaechliche_dienste_monat <=
|
|
fairness_abweichung_lokal[eltern, dienst])
|
|
|
|
# F1: Globale Fairness - basierend auf berechneter Zielverteilung
|
|
ziel_gesamt = ziel_dienste[eltern][dienst]
|
|
vorherige_dienste = self.vorherige_dienste[eltern][dienst]
|
|
|
|
if ziel_gesamt > 0:
|
|
# Tatsächliche Dienste global (Vergangenheit + geplant)
|
|
total_dienste_inkl_vergangenheit = tatsaechliche_dienste_monat + vorherige_dienste
|
|
|
|
# F1: Globale Fairness-Constraints
|
|
prob += (total_dienste_inkl_vergangenheit - ziel_gesamt <=
|
|
fairness_abweichung_global[eltern, dienst])
|
|
prob += (ziel_gesamt - total_dienste_inkl_vergangenheit <=
|
|
fairness_abweichung_global[eltern, dienst])
|
|
|
|
# Gewichtung: Jahresanfang F1 stärker, Jahresende F2 stärker
|
|
# Annahme: September = Jahresanfang, Juli = Jahresende
|
|
aktueller_monat = self.tage[0].month if self.tage else 1
|
|
if 9 <= aktueller_monat <= 12: # Sep-Dez: Jahresanfang
|
|
gewicht_f1 = 100 # Global wichtiger
|
|
gewicht_f2 = 50 # Lokal weniger wichtig
|
|
elif 1 <= aktueller_monat <= 3: # Jan-Mar: Jahresmitte
|
|
gewicht_f1 = 75
|
|
gewicht_f2 = 75
|
|
else: # Apr-Jul: Jahresende
|
|
gewicht_f1 = 50 # Global weniger wichtig
|
|
gewicht_f2 = 100 # Lokal wichtiger
|
|
|
|
# Fairness-Terme zur Zielfunktion hinzufügen
|
|
for eltern in self.eltern:
|
|
for dienst in self.dienste:
|
|
objective_terms.append(gewicht_f1 * fairness_abweichung_global[eltern, dienst])
|
|
objective_terms.append(gewicht_f2 * fairness_abweichung_lokal[eltern, dienst])
|
|
|
|
# P1: Bevorzugte Dienste (positiv belohnen)
|
|
for (eltern, tag, dienst), präf in self.präferenzen.items():
|
|
if (eltern, tag, dienst) in x and präf == 1: # bevorzugt
|
|
objective_terms.append(-5 * x[eltern, tag, dienst]) # Schwächer als Fairness
|
|
|
|
# P2: Abgelehnte Dienste (bestrafen)
|
|
for (eltern, tag, dienst), präf in self.präferenzen.items():
|
|
if (eltern, tag, dienst) in x and präf == -1: # abgelehnt
|
|
objective_terms.append(25 * x[eltern, tag, dienst]) # Schwächer als Fairness
|
|
|
|
# Zielfunktion setzen
|
|
if objective_terms:
|
|
prob += pulp.lpSum(objective_terms)
|
|
else:
|
|
# Fallback: Minimiere Gesamtanzahl Dienste
|
|
prob += pulp.lpSum([var for var in x.values()])
|
|
|
|
print(f"Verwende Gewichtung: F1 (global) = {gewicht_f1}, F2 (lokal) = {gewicht_f2}")
|
|
print(f"Modell erstellt mit {len(x)} Variablen und {len(prob.constraints)} Constraints")
|
|
return prob, x
|
|
|
|
def löse_optimierung(self, prob: pulp.LpProblem,
|
|
x: Dict[Tuple[str, date, Dienst], pulp.LpVariable]) -> Optional[Dict[date, Dict[Dienst, List[str]]]]:
|
|
"""Löst das Optimierungsproblem"""
|
|
print("Löse Optimierungsproblem...")
|
|
|
|
# Solver wählen (verfügbare Solver testen)
|
|
solver = None
|
|
try:
|
|
print("Versuche CBC Solver...")
|
|
solver = pulp.PULP_CBC_CMD(msg=0, timeLimit=10) # Standard CBC Solver
|
|
except:
|
|
try:
|
|
print("Versuche GLPK Solver...")
|
|
solver = pulp.GLPK_CMD(msg=0) # GLPK falls verfügbar
|
|
except:
|
|
print("Kein spezifizierter Solver verfügbar, verwende Standard.")
|
|
solver = None # Default Solver
|
|
|
|
prob.solve(solver)
|
|
|
|
status = pulp.LpStatus[prob.status]
|
|
print(f"Optimierung abgeschlossen: {status}")
|
|
|
|
if prob.status != pulp.LpStatusOptimal:
|
|
print("WARNUNG: Keine optimale Lösung gefunden!")
|
|
return None
|
|
|
|
# Lösung extrahieren
|
|
lösung: Dict[date, Dict[Dienst, List[str]]] = {}
|
|
for (eltern, tag, dienst), var in x.items():
|
|
if var.varValue and var.varValue > 0.5: # Binary variable ist 1
|
|
if tag not in lösung:
|
|
lösung[tag] = {}
|
|
if dienst not in lösung[tag]:
|
|
lösung[tag][dienst] = []
|
|
lösung[tag][dienst].append(eltern)
|
|
|
|
return lösung
|
|
|
|
def schreibe_ausgabe_csv(self, datei: str, lösung: Dict[date, Dict[Dienst, List[str]]]) -> None:
|
|
"""Schreibt die Lösung in die ausgabe.csv"""
|
|
print(f"Schreibe Ergebnisse nach {datei}...")
|
|
|
|
with open(datei, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.writer(f)
|
|
|
|
# Header schreiben
|
|
header = ['Datum', 'Wochentag'] + [dienst.name for dienst in self.dienste]
|
|
writer.writerow(header)
|
|
|
|
# Daten schreiben
|
|
for tag in sorted(self.tage):
|
|
wochentag = ['Montag', 'Dienstag', 'Mittwoch', 'Donnerstag', 'Freitag', 'Samstag', 'Sonntag'][tag.weekday()]
|
|
|
|
row = [tag.strftime('%Y-%m-%d'), wochentag]
|
|
|
|
for dienst in self.dienste:
|
|
if tag in lösung and dienst in lösung[tag]:
|
|
eltern_str = ' '.join(lösung[tag][dienst])
|
|
else:
|
|
eltern_str = ''
|
|
row.append(eltern_str)
|
|
|
|
writer.writerow(row)
|
|
|
|
print("Ausgabe erfolgreich geschrieben!")
|
|
|
|
def drucke_statistiken(self, lösung: Dict[date, Dict[Dienst, List[str]]]) -> None:
|
|
"""Druckt Statistiken zur Lösung"""
|
|
print("\n" + "="*50)
|
|
print("STATISTIKEN")
|
|
print("="*50)
|
|
|
|
# Dienste pro Eltern zählen
|
|
dienste_pro_eltern = defaultdict(lambda: defaultdict(int))
|
|
for tag, tag_dienste in lösung.items():
|
|
for dienst, eltern_liste in tag_dienste.items():
|
|
for eltern in eltern_liste:
|
|
dienste_pro_eltern[eltern][dienst] += 1
|
|
|
|
# Gesamtübersicht
|
|
print("\nDienste pro Eltern:")
|
|
for eltern in sorted(self.eltern):
|
|
gesamt = sum(dienste_pro_eltern[eltern].values())
|
|
dienste_detail = ', '.join(f"{dienst.kuerzel}:{dienste_pro_eltern[eltern][dienst]}"
|
|
for dienst in self.dienste if dienste_pro_eltern[eltern][dienst] > 0)
|
|
print(f" {eltern:15} {gesamt:3d} ({dienste_detail})")
|
|
|
|
# Dienstfaktor-Analyse
|
|
print(f"\nDienstfaktoren im Planungszeitraum:")
|
|
for eltern in sorted(self.eltern):
|
|
faktor_summe = sum(self.dienstfaktoren.get(eltern, {}).get(tag, 0) for tag in self.tage)
|
|
print(f" {eltern:15} {faktor_summe:.1f}")
|
|
|
|
|
|
def main() -> None:
|
|
if len(sys.argv) < 4:
|
|
print("Usage: ./elterndienstplaner.py <eingabe.csv> <eltern.csv> <ausgabe.csv> [<vorherige-ausgaben.csv>]")
|
|
sys.exit(1)
|
|
|
|
eingabe_datei = sys.argv[1]
|
|
eltern_datei = sys.argv[2]
|
|
ausgabe_datei = sys.argv[3]
|
|
vorherige_datei = sys.argv[4] if len(sys.argv) > 4 else None
|
|
|
|
print("Elterndienstplaner gestartet")
|
|
print("="*50)
|
|
|
|
try:
|
|
planer = Elterndienstplaner()
|
|
|
|
# Daten laden
|
|
planer.lade_eingabe_csv(eingabe_datei)
|
|
planer.lade_eltern_csv(eltern_datei)
|
|
if vorherige_datei:
|
|
planer.lade_vorherige_ausgaben_csv(vorherige_datei)
|
|
|
|
# Optimierung
|
|
prob, x = planer.erstelle_optimierungsmodell()
|
|
lösung = planer.löse_optimierung(prob, x)
|
|
|
|
if lösung is not None:
|
|
# Ergebnisse ausgeben
|
|
planer.schreibe_ausgabe_csv(ausgabe_datei, lösung)
|
|
planer.drucke_statistiken(lösung)
|
|
print("\n✓ Planung erfolgreich abgeschlossen!")
|
|
else:
|
|
print("\n✗ Fehler: Keine gültige Lösung gefunden!")
|
|
sys.exit(1)
|
|
|
|
except Exception as e:
|
|
print(f"\n✗ Fehler: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|