#!/usr/bin/env python3 """ CSV I/O Module für Elterndienstplaner Trennt CSV-Parsing und -Schreiben von der Business-Logik """ import csv from datetime import datetime, date, timedelta from typing import Dict, List, Tuple, DefaultDict from collections import defaultdict class EingabeParser: """Parst CSV-Eingabedateien für den Elterndienstplaner""" @staticmethod def parse_eingabe_csv(datei: str, dienste_lookup) -> Tuple[ List[str], # eltern List[date], # tage Dict[date, List], # benoetigte_dienste (Dienst-Objekte) Dict[Tuple[str, date], bool], # verfügbarkeit Dict[Tuple[str, date, any], int] # präferenzen (Dienst-Objekt als Key) ]: """ Lädt die eingabe.csv mit Terminen und Präferenzen Args: datei: Pfad zur eingabe.csv dienste_lookup: Funktion (str) -> Dienst zum Auflösen von Kürzeln Returns: Tuple mit (eltern, tage, benoetigte_dienste, verfügbarkeit, präferenzen) """ print(f"Lade Eingabedaten aus {datei}...") eltern = [] tage = [] benoetigte_dienste = {} verfügbarkeit = {} präferenzen = {} with open(datei, 'r', encoding='utf-8') as f: reader = csv.reader(f) header = next(reader) # Eltern aus Header extrahieren (ab Spalte 3) eltern = [name.strip() for name in header[3:] if name.strip()] print(f"Gefundene Eltern: {eltern}") for row in reader: if len(row) < 3: continue datum = row[0].strip() wochentag = row[1].strip() dienste_str = row[2].strip() # Datum parsen try: datum_obj = datetime.strptime(datum, '%Y-%m-%d').date() tage.append(datum_obj) except ValueError: print(f"Warnung: Ungültiges Datum {datum}") continue # Benötigte Dienste dienst_objekte = [] for kuerzel in dienste_str: dienst = dienste_lookup(kuerzel) if dienst: dienst_objekte.append(dienst) benoetigte_dienste[datum_obj] = dienst_objekte # Verfügbarkeit und Präferenzen der Eltern for i, eltern_name in enumerate(eltern): if i + 3 < len(row): präf_str = row[i + 3].strip() # Verfügbarkeit prüfen if präf_str == 'x': verfügbarkeit[(eltern_name, datum_obj)] = False else: verfügbarkeit[(eltern_name, datum_obj)] = True # Präferenzen parsen _parse_präferenzen_string( eltern_name, datum_obj, präf_str, präferenzen, dienste_lookup ) else: # Standard: verfügbar, keine Präferenzen verfügbarkeit[(eltern_name, datum_obj)] = True tage.sort() print(f"Zeitraum: {tage[0]} bis {tage[-1]} ({len(tage)} Tage)") return eltern, tage, benoetigte_dienste, verfügbarkeit, präferenzen @staticmethod def parse_eltern_csv(datei: str) -> Dict[str, DefaultDict[date, float]]: """ Lädt die eltern.csv mit Dienstfaktoren Args: datei: Pfad zur eltern.csv Returns: Dictionary mit Dienstfaktoren für alle Tage in den definierten Zeiträumen (DefaultDict gibt 0 für Tage außerhalb der Zeiträume zurück) """ print(f"Lade Elterndaten aus {datei}...") dienstfaktoren = {} with open(datei, 'r', encoding='utf-8') as f: reader = csv.reader(f) header = next(reader) for row in reader: if len(row) < 4: continue eltern_name = row[0].strip() # Initialisiere mit DefaultDict (Standard: 0) dienstfaktoren[eltern_name] = defaultdict(float) # Alle Zeiträume einlesen (jeweils 3 Spalten: Beginn, Ende, Faktor) for i in range(1, len(row), 3): if i + 2 < len(row) and row[i].strip() and row[i + 1].strip(): try: beginn = datetime.strptime(row[i].strip(), '%Y-%m-%d').date() ende = datetime.strptime(row[i + 1].strip(), '%Y-%m-%d').date() faktor = float(row[i + 2].strip()) if row[i + 2].strip() else 0 # Faktor für alle Tage im Zeitraum setzen/überschreiben aktueller_tag = beginn while aktueller_tag <= ende: dienstfaktoren[eltern_name][aktueller_tag] = faktor aktueller_tag += timedelta(days=1) except (ValueError, IndexError): continue print(f"Dienstfaktoren geladen für {len(dienstfaktoren)} Eltern") return dienstfaktoren @staticmethod def parse_vorherige_ausgaben_csv( datei: str, eltern: List[str], dienste: List, # List[Dienst] ) -> List[Tuple[date, str, any]]: # historische_dienste (mit Dienst-Objekt) """ Lädt vorherige-ausgaben.csv für Fairness-Constraints Args: datei: Pfad zur vorherige-ausgaben.csv eltern: Liste der Elternnamen dienste: Liste der Dienst-Objekte Returns: Liste der historischen Dienste """ print(f"Lade vorherige Ausgaben aus {datei}...") historische_dienste = [] try: with open(datei, 'r', encoding='utf-8') as f: reader = csv.reader(f) header = next(reader) # Dienst-Spalten finden dienst_spalten = {} for i, col_name in enumerate(header[2:], 2): # Ab Spalte 2 (nach Datum, Wochentag) for dienst in dienste: if dienst.name.lower() in col_name.lower() or dienst.kuerzel == col_name: dienst_spalten[dienst] = i break for row in reader: if len(row) < 3: continue # Datum parsen try: datum = datetime.strptime(row[0].strip(), '%Y-%m-%d').date() except ValueError: continue # Zugeteilte Dienste mit Datum speichern for dienst, spalte_idx in dienst_spalten.items(): if spalte_idx < len(row) and row[spalte_idx].strip(): # Mehrere Eltern können in einer Zelle stehen (durch " und " getrennt) eltern_liste = row[spalte_idx].strip().split(' und ') for eltern_name in eltern_liste: if eltern_name in eltern: # Historische Dienste mit Datum speichern historische_dienste.append((datum, eltern_name, dienst)) except FileNotFoundError: print("Keine vorherigen Ausgaben gefunden - starte ohne historische Daten") print(f"Historische Dienste mit Datum: {len(historische_dienste)} Einträge") return historische_dienste class AusgabeWriter: """Schreibt Optimierungsergebnisse in CSV-Dateien""" @staticmethod def schreibe_ausgabe_csv( datei: str, lösung: Dict[date, Dict[any, List[str]]], # Dienst-Objekt als Key tage: List[date], dienste: List # List[Dienst] ) -> None: """ Schreibt die Lösung in die ausgabe.csv Args: datei: Pfad zur ausgabe.csv lösung: Dictionary mit Zuteilungen {datum: {dienst: [eltern]}} tage: Liste aller Planungstage dienste: Liste der Dienst-Objekte """ print(f"Schreibe Ergebnisse nach {datei}...") with open(datei, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) # Header schreiben header = ['Datum', 'Wochentag'] + [dienst.name for dienst in dienste] writer.writerow(header) # Daten schreiben for tag in sorted(tage): wochentag = ['Montag', 'Dienstag', 'Mittwoch', 'Donnerstag', 'Freitag', 'Samstag', 'Sonntag'][tag.weekday()] row = [tag.strftime('%Y-%m-%d'), wochentag] for dienst in dienste: if tag in lösung and dienst in lösung[tag]: eltern_str = ' und '.join(lösung[tag][dienst]) else: eltern_str = '' row.append(eltern_str) writer.writerow(row) print("Ausgabe erfolgreich geschrieben!") # Hilfsfunktionen def _parse_präferenzen_string( eltern: str, datum: date, präf_str: str, präferenzen: Dict, dienste_lookup ) -> None: """Parst Präferenzstring wie 'F+P-E+' und fügt zu präferenzen hinzu""" i = 0 while i < len(präf_str): if i + 1 < len(präf_str): dienst = dienste_lookup(präf_str[i]) if dienst and i + 1 < len(präf_str): if präf_str[i + 1] == '+': präferenzen[(eltern, datum, dienst)] = 1 i += 2 elif präf_str[i + 1] == '-': präferenzen[(eltern, datum, dienst)] = -1 i += 2 else: i += 1 else: i += 1 else: i += 1