diff options
| author | Ariel Costas Guerrero <ariel@costas.dev> | 2026-03-26 09:53:39 +0100 |
|---|---|---|
| committer | Ariel Costas Guerrero <ariel@costas.dev> | 2026-03-26 09:53:52 +0100 |
| commit | 84db1ca075dc63ccb02da825948d95ad09f94e4d (patch) | |
| tree | b9678bec669304bb3201a1e40a86bb3828150fac /build_vitrasa/build_static_feed.py | |
| parent | 291450f2add8ddd6ed8757b2bdbfceb476be3033 (diff) | |
Convert submodules to regular repo files, add custom feeds
Diffstat (limited to 'build_vitrasa/build_static_feed.py')
| -rw-r--r-- | build_vitrasa/build_static_feed.py | 191 |
1 files changed, 191 insertions, 0 deletions
diff --git a/build_vitrasa/build_static_feed.py b/build_vitrasa/build_static_feed.py new file mode 100644 index 0000000..3b7fb7e --- /dev/null +++ b/build_vitrasa/build_static_feed.py @@ -0,0 +1,191 @@ +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "requests" +# ] +# /// + +from argparse import ArgumentParser +import csv +from datetime import date, datetime, timedelta +import json +import logging +import os +import shutil +import tempfile +import zipfile + +import requests + + +def get_rows(input_file: str) -> list[dict]: + rows: list[dict] = [] + + with open(input_file, "r", encoding="utf-8") as f: + reader = csv.DictReader(f) + if reader.fieldnames is None: + return [] + reader.fieldnames = [name.strip() for name in reader.fieldnames] + + for row in reader: + rows.append(row) + + return rows + + +if __name__ == "__main__": + parser = ArgumentParser() + parser.add_argument( + "--debug", + help="Enable debug logging", + action="store_true" + ) + + args = parser.parse_args() + + logging.basicConfig( + level=logging.DEBUG if args.debug else logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", + ) + + INPUT_GTFS_FD, INPUT_GTFS_ZIP = tempfile.mkstemp(suffix=".zip", prefix="vigo_in_") + INPUT_GTFS_PATH = tempfile.mkdtemp(prefix="vigo_in_") + OUTPUT_GTFS_PATH = tempfile.mkdtemp(prefix="vigo_out_") + OUTPUT_GTFS_ZIP = os.path.join(os.path.dirname(__file__), "gtfs_vigo.zip") + + FEED_URL = f"https://datos.vigo.org/data/transporte/gtfs_vigo.zip" + + logging.info(f"Downloading GTFS feed from '{FEED_URL}'...") + response = requests.get(FEED_URL) + with open(INPUT_GTFS_ZIP, "wb") as f: + f.write(response.content) + + # Unzip the GTFS feed + with zipfile.ZipFile(INPUT_GTFS_ZIP, "r") as zip_ref: + zip_ref.extractall(INPUT_GTFS_PATH) + + TRIPS_FILE = os.path.join(INPUT_GTFS_PATH, "trips.txt") + STOPS_FILE = os.path.join(INPUT_GTFS_PATH, "stops.txt") + ROUTES_FILE = os.path.join(INPUT_GTFS_PATH, "routes.txt") + + # Build calendar.txt from calendar_dates.txt + # infer each service's weekly pattern from which actual dates it ran on. + # The "reference weekday" is the weekday date with the most active services + # (i.e. the most likely normal working day, avoiding holidays). + # Saturday and Sunday services are inferred from the Saturday/Sunday dates present. + CALENDAR_DATES_FILE = os.path.join(INPUT_GTFS_PATH, "calendar_dates.txt") + + # service_id -> set of YYYYMMDD date strings (exception_type=1 only) + service_dates: dict[str, set[str]] = {} + for row in get_rows(CALENDAR_DATES_FILE): + if row.get("exception_type", "").strip() != "1": + continue + sid = row["service_id"].strip() + d = row["date"].strip() + service_dates.setdefault(sid, set()).add(d) + + logging.debug(f"Found {len(service_dates)} service IDs in calendar_dates.txt") + + def _parse_date(d: str) -> date: + return datetime.strptime(d, "%Y%m%d").date() + + all_dates: set[str] = {d for dates in service_dates.values() for d in dates} + + # Group dates by day-of-week (0=Mon … 6=Sun) + dates_by_dow: dict[int, list[str]] = {} + for d in all_dates: + dow = _parse_date(d).weekday() + dates_by_dow.setdefault(dow, []).append(d) + + saturday_dates: set[str] = set(dates_by_dow.get(5, [])) + sunday_dates: set[str] = set(dates_by_dow.get(6, [])) + weekday_dates: set[str] = set() + for _dow in range(5): + weekday_dates.update(dates_by_dow.get(_dow, [])) + + # Pick the weekday date where the most services run (most "normal" working day). + # Days with fewer services than others are likely public holidays. + weekday_svc_counts: dict[str, int] = { + d: sum(1 for dates in service_dates.values() if d in dates) + for d in weekday_dates + } + if weekday_svc_counts: + ref_weekday = max(weekday_svc_counts, key=weekday_svc_counts.__getitem__) + logging.info( + f"Reference weekday: {ref_weekday} " + f"({_parse_date(ref_weekday).strftime('%A')}) " + f"with {weekday_svc_counts[ref_weekday]} active services" + ) + else: + ref_weekday = None + logging.warning("No weekday dates found in calendar_dates.txt") + + feed_start = min(_parse_date(d) for d in all_dates) + feed_end = feed_start + timedelta(days=365) + + calendar_output_rows: list[dict] = [] + for sid, dates in service_dates.items(): + is_weekday = ref_weekday is not None and ref_weekday in dates + is_saturday = bool(dates & saturday_dates) + is_sunday = bool(dates & sunday_dates) + + if not is_weekday and not is_saturday and not is_sunday: + logging.warning(f"Service {sid!r} has no day-type classification, skipping") + continue + + wd = "1" if is_weekday else "0" + sat = "1" if is_saturday else "0" + sun = "1" if is_sunday else "0" + calendar_output_rows.append({ + "service_id": sid, + "monday": wd, + "tuesday": wd, + "wednesday": wd, + "thursday": wd, + "friday": wd, + "saturday": sat, + "sunday": sun, + # 2 days before feed start, so feeds published early don't mess it up + "start_date": (feed_start - timedelta(days=2)).strftime("%Y%m%d"), + "end_date": feed_end.strftime("%Y%m%d"), + }) + + logging.info(f"Generated {len(calendar_output_rows)} calendar.txt entries") + + # Copy every file in the feed except calendar_dates.txt / calendar.txt + # (we replace them with a freshly generated calendar.txt above) + for filename in os.listdir(INPUT_GTFS_PATH): + if not filename.endswith(".txt"): + continue + if filename in ("calendar_dates.txt", "calendar.txt"): + continue + + src_path = os.path.join(INPUT_GTFS_PATH, filename) + dest_path = os.path.join(OUTPUT_GTFS_PATH, filename) + shutil.copy(src_path, dest_path) + + CALENDAR_OUTPUT_FILE = os.path.join(OUTPUT_GTFS_PATH, "calendar.txt") + with open(CALENDAR_OUTPUT_FILE, "w", encoding="utf-8", newline="") as f: + fieldnames = [ + "service_id", "monday", "tuesday", "wednesday", "thursday", + "friday", "saturday", "sunday", "start_date", "end_date", + ] + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(calendar_output_rows) + + # Create a ZIP archive of the output GTFS + with zipfile.ZipFile(OUTPUT_GTFS_ZIP, "w", zipfile.ZIP_DEFLATED) as zipf: + for root, _, files in os.walk(OUTPUT_GTFS_PATH): + for file in files: + file_path = os.path.join(root, file) + arcname = os.path.relpath(file_path, OUTPUT_GTFS_PATH) + zipf.write(file_path, arcname) + + logging.info( + f"GTFS data from feed has been zipped successfully at {OUTPUT_GTFS_ZIP}." + ) + os.close(INPUT_GTFS_FD) + os.remove(INPUT_GTFS_ZIP) + shutil.rmtree(INPUT_GTFS_PATH) + shutil.rmtree(OUTPUT_GTFS_PATH) |
