Jan Hoheisel 08e5cf11bd Mehr Fuktionen fuer historische Dienste
Planungszeitraum und historischer Zeitraum koennen sich jetzt
ueberlappen. So lassen sich einzelne Dienste (z.B. von einer Familie)
nachtraeglich neu planen.

historische Dienste werden bei den Constraints 1 Dienst pro Tag und 1
Dienst pro Woche korrekt beruecksichtigt

elterndienstplaner.py erzeugt jetzt ausgaben-gesamt.csv, die fuer
spaetere Aufrufe als Eingabe vorherige-dienste.csv verwendet werden kann.
2026-01-28 20:30:14 +01:00

323 lines
12 KiB
Python

#!/usr/bin/env python3
"""
CSV I/O Module für Elterndienstplaner
Trennt CSV-Parsing und -Schreiben von der Business-Logik
"""
import csv
from datetime import datetime, date, timedelta
from typing import Dict, List, Tuple, DefaultDict, Optional
from collections import defaultdict
class EingabeParser:
"""Parst CSV-Eingabedateien für den Elterndienstplaner"""
@staticmethod
def parse_eingabe_csv(datei: str, dienste_lookup) -> Tuple[
List[str], # eltern
List[date], # tage
Dict[date, List], # benoetigte_dienste (Dienst-Objekte)
Dict[Tuple[str, date], bool], # verfügbarkeit
Dict[Tuple[str, date, any], int] # präferenzen (Dienst-Objekt als Key)
]:
"""
Lädt die eingabe.csv mit Terminen und Präferenzen
Args:
datei: Pfad zur eingabe.csv
dienste_lookup: Funktion (str) -> Dienst zum Auflösen von Kürzeln
Returns:
Tuple mit (eltern, tage, benoetigte_dienste, verfügbarkeit, präferenzen)
"""
print(f"Lade Eingabedaten aus {datei}...")
eltern = []
tage = []
benoetigte_dienste = {}
verfügbarkeit = {}
präferenzen = {}
with open(datei, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
header = next(reader)
# Eltern aus Header extrahieren (ab Spalte 3)
eltern = [name.strip() for name in header[3:] if name.strip()]
print(f"Gefundene Eltern: {eltern}")
for row in reader:
if len(row) < 3:
continue
datum = row[0].strip()
wochentag = row[1].strip()
dienste_str = row[2].strip()
# Datum parsen
try:
datum_obj = datetime.strptime(datum, '%Y-%m-%d').date()
tage.append(datum_obj)
except ValueError:
print(f"Warnung: Ungültiges Datum {datum}")
continue
# Benötigte Dienste
benoetigte_dienste[datum_obj] = [
dienste_lookup(kuerzel) for kuerzel in dienste_str
]
# Verfügbarkeit und Präferenzen der Eltern
for i, eltern_name in enumerate(eltern):
if i + 3 < len(row):
präf_str = row[i + 3].strip()
# Verfügbarkeit prüfen
if präf_str == 'x':
verfügbarkeit[(eltern_name, datum_obj)] = False
else:
verfügbarkeit[(eltern_name, datum_obj)] = True
# Präferenzen parsen
_parse_präferenzen_string(
eltern_name, datum_obj, präf_str,
präferenzen, dienste_lookup
)
else:
# Standard: verfügbar, keine Präferenzen
verfügbarkeit[(eltern_name, datum_obj)] = True
tage.sort()
print(f"Zeitraum: {tage[0]} bis {tage[-1]} ({len(tage)} Tage)")
return eltern, tage, benoetigte_dienste, verfügbarkeit, präferenzen
@staticmethod
def parse_eltern_csv(datei: str) -> Dict[str, DefaultDict[date, float]]:
"""
Lädt die eltern.csv mit Dienstfaktoren
Args:
datei: Pfad zur eltern.csv
Returns:
Dictionary mit Dienstfaktoren für alle Tage in den definierten Zeiträumen
(DefaultDict gibt 0 für Tage außerhalb der Zeiträume zurück)
"""
print(f"Lade Elterndaten aus {datei}...")
dienstfaktoren = {}
with open(datei, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
header = next(reader)
for row in reader:
if len(row) < 4:
continue
eltern_name = row[0].strip()
# Initialisiere mit DefaultDict (Standard: 0)
dienstfaktoren[eltern_name] = defaultdict(float)
# Alle Zeiträume einlesen (jeweils 3 Spalten: Beginn, Ende, Faktor)
for i in range(1, len(row), 3):
if i + 2 < len(row) and row[i].strip() and row[i + 1].strip():
try:
beginn = datetime.strptime(row[i].strip(), '%Y-%m-%d').date()
ende = datetime.strptime(row[i + 1].strip(), '%Y-%m-%d').date()
faktor = float(row[i + 2].strip()) if row[i + 2].strip() else 0
# Faktor für alle Tage im Zeitraum setzen/überschreiben
aktueller_tag = beginn
while aktueller_tag <= ende:
dienstfaktoren[eltern_name][aktueller_tag] = faktor
aktueller_tag += timedelta(days=1)
except (ValueError, IndexError):
continue
print(f"Dienstfaktoren geladen für {len(dienstfaktoren)} Eltern")
return dienstfaktoren
@staticmethod
def parse_vorherige_ausgaben_csv(
datei: str,
eltern: List[str],
dienste: List, # List[Dienst]
) -> List[Tuple[date, str, any]]: # historische_dienste (mit Dienst-Objekt)
"""
Lädt vorherige-ausgaben.csv für Fairness-Constraints
Args:
datei: Pfad zur vorherige-ausgaben.csv
eltern: Liste der Elternnamen
dienste: Liste der Dienst-Objekte
Returns:
Liste der historischen Dienste
"""
print(f"Lade vorherige Ausgaben aus {datei}...")
historische_dienste = []
try:
with open(datei, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
header = next(reader)
# Dienst-Spalten finden
dienst_spalten = {}
for i, col_name in enumerate(header[2:], 2): # Ab Spalte 2 (nach Datum, Wochentag)
for dienst in dienste:
if dienst.name.lower() in col_name.lower() or dienst.kuerzel == col_name:
dienst_spalten[dienst] = i
break
for row in reader:
if len(row) < 3:
continue
# Datum parsen
try:
datum = datetime.strptime(row[0].strip(), '%Y-%m-%d').date()
except ValueError:
continue
# Zugeteilte Dienste mit Datum speichern
for dienst, spalte_idx in dienst_spalten.items():
if spalte_idx < len(row) and row[spalte_idx].strip():
# Mehrere Eltern können in einer Zelle stehen (durch " und " getrennt)
eltern_liste = row[spalte_idx].strip().split(' und ')
for eltern_name in eltern_liste:
if eltern_name in eltern:
# Historische Dienste mit Datum speichern
historische_dienste.append((datum, eltern_name, dienst))
except FileNotFoundError:
print("Keine vorherigen Ausgaben gefunden - starte ohne historische Daten")
print(f"Historische Dienste mit Datum: {len(historische_dienste)} Einträge")
return historische_dienste
class AusgabeWriter:
"""Schreibt Optimierungsergebnisse in CSV-Dateien"""
@staticmethod
def schreibe_ausgabe_csv(
datei: str,
lösung: Dict[date, Dict[any, List[str]]], # Dienst-Objekt als Key
tage: List[date],
dienste: List, # List[Dienst]
gesamt: bool = False,
historische_dienste: List[Tuple[date, str, any]] = None
) -> None:
"""
Schreibt die Lösung in die ausgabe.csv
Args:
datei: Pfad zur ausgabe.csv
lösung: Dictionary mit Zuteilungen {datum: {dienst: [eltern]}}
tage: Liste aller Planungstage (aktueller Planungszeitraum)
dienste: Liste der Dienst-Objekte
gesamt: Wenn True -> schreibe gesamte Dienste (inkl. historische_dienste).
Wenn False -> wie bisher nur die neu verplanten Dienste (lösung).
historische_dienste: Optional Liste von (datum, eltern, dienst) aus vorherige-ausgaben.csv
(wird nur ausgewertet, wenn gesamt==True)
"""
print(f"Schreibe Ergebnisse nach {datei}... (gesamt={gesamt})")
# Bestimme alle zu schreibenden Tage
if gesamt and historische_dienste:
historische_dates = {hd[0] for hd in historische_dienste}
output_dates = sorted(set(tage) | historische_dates)
else:
output_dates = sorted(tage)
# Sicherstellen: Für alle Tage im Zeitraum (von min bis max) soll eine Zeile ausgegeben werden,
# auch wenn keine Informationen vorliegen.
if output_dates:
start_date = min(output_dates)
end_date = max(output_dates)
full_dates = []
current = start_date
while current <= end_date:
full_dates.append(current)
current += timedelta(days=1)
output_dates = full_dates
# Erstelle Mapping date -> dienst -> list[eltern]
combined: Dict[date, Dict[any, List[str]]] = {}
if gesamt and historische_dienste:
for datum, eltern_name, dienst in historische_dienste:
combined.setdefault(datum, {}).setdefault(dienst, [])
if eltern_name not in combined[datum][dienst]:
combined[datum][dienst].append(eltern_name)
# Füge neue (optimierte) Zuweisungen hinzu (überschreiben/ergänzen)
for datum, dienst_map in (lösung or {}).items():
combined.setdefault(datum, {})
for dienst, eltern_liste in dienst_map.items():
combined[datum].setdefault(dienst, [])
for eltern_name in eltern_liste:
if eltern_name not in combined[datum][dienst]:
combined[datum][dienst].append(eltern_name)
with open(datei, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# Header schreiben
header = ['Datum', 'Wochentag'] + [dienst.name for dienst in dienste]
writer.writerow(header)
# Daten schreiben
for tag in output_dates:
wochentag = ['Montag', 'Dienstag', 'Mittwoch', 'Donnerstag',
'Freitag', 'Samstag', 'Sonntag'][tag.weekday()]
row = [tag.strftime('%Y-%m-%d'), wochentag]
for dienst in dienste:
eltern_str = ''
if tag in combined and dienst in combined[tag]:
eltern_str = ' und '.join(combined[tag][dienst])
row.append(eltern_str)
writer.writerow(row)
print("Ausgabe erfolgreich geschrieben!")
# Hilfsfunktionen
def _parse_präferenzen_string(
eltern: str,
datum: date,
präf_str: str,
präferenzen: Dict,
dienste_lookup
) -> None:
"""Parst Präferenzstring wie 'F+P-E+' und fügt zu präferenzen hinzu"""
i = 0
while i < len(präf_str):
if i + 1 < len(präf_str):
dienst = dienste_lookup(präf_str[i])
if dienst and i + 1 < len(präf_str):
if präf_str[i + 1] == '+':
präferenzen[(eltern, datum, dienst)] = 1
i += 2
elif präf_str[i + 1] == '-':
präferenzen[(eltern, datum, dienst)] = -1
i += 2
else:
i += 1
else:
i += 1
else:
i += 1