aboutsummaryrefslogtreecommitdiff
path: root/build_vitrasa/build_static_feed.py
diff options
context:
space:
mode:
authorAriel Costas Guerrero <ariel@costas.dev>2026-03-26 09:53:39 +0100
committerAriel Costas Guerrero <ariel@costas.dev>2026-03-26 09:53:52 +0100
commit84db1ca075dc63ccb02da825948d95ad09f94e4d (patch)
treeb9678bec669304bb3201a1e40a86bb3828150fac /build_vitrasa/build_static_feed.py
parent291450f2add8ddd6ed8757b2bdbfceb476be3033 (diff)
Convert submodules to regular repo files, add custom feeds
Diffstat (limited to 'build_vitrasa/build_static_feed.py')
-rw-r--r--build_vitrasa/build_static_feed.py191
1 files changed, 191 insertions, 0 deletions
diff --git a/build_vitrasa/build_static_feed.py b/build_vitrasa/build_static_feed.py
new file mode 100644
index 0000000..3b7fb7e
--- /dev/null
+++ b/build_vitrasa/build_static_feed.py
@@ -0,0 +1,191 @@
+# /// script
+# requires-python = ">=3.12"
+# dependencies = [
+# "requests"
+# ]
+# ///
+
+from argparse import ArgumentParser
+import csv
+from datetime import date, datetime, timedelta
+import json
+import logging
+import os
+import shutil
+import tempfile
+import zipfile
+
+import requests
+
+
+def get_rows(input_file: str) -> list[dict]:
+ rows: list[dict] = []
+
+ with open(input_file, "r", encoding="utf-8") as f:
+ reader = csv.DictReader(f)
+ if reader.fieldnames is None:
+ return []
+ reader.fieldnames = [name.strip() for name in reader.fieldnames]
+
+ for row in reader:
+ rows.append(row)
+
+ return rows
+
+
+if __name__ == "__main__":
+ parser = ArgumentParser()
+ parser.add_argument(
+ "--debug",
+ help="Enable debug logging",
+ action="store_true"
+ )
+
+ args = parser.parse_args()
+
+ logging.basicConfig(
+ level=logging.DEBUG if args.debug else logging.INFO,
+ format="%(asctime)s - %(levelname)s - %(message)s",
+ )
+
+ INPUT_GTFS_FD, INPUT_GTFS_ZIP = tempfile.mkstemp(suffix=".zip", prefix="vigo_in_")
+ INPUT_GTFS_PATH = tempfile.mkdtemp(prefix="vigo_in_")
+ OUTPUT_GTFS_PATH = tempfile.mkdtemp(prefix="vigo_out_")
+ OUTPUT_GTFS_ZIP = os.path.join(os.path.dirname(__file__), "gtfs_vigo.zip")
+
+ FEED_URL = f"https://datos.vigo.org/data/transporte/gtfs_vigo.zip"
+
+ logging.info(f"Downloading GTFS feed from '{FEED_URL}'...")
+ response = requests.get(FEED_URL)
+ with open(INPUT_GTFS_ZIP, "wb") as f:
+ f.write(response.content)
+
+ # Unzip the GTFS feed
+ with zipfile.ZipFile(INPUT_GTFS_ZIP, "r") as zip_ref:
+ zip_ref.extractall(INPUT_GTFS_PATH)
+
+ TRIPS_FILE = os.path.join(INPUT_GTFS_PATH, "trips.txt")
+ STOPS_FILE = os.path.join(INPUT_GTFS_PATH, "stops.txt")
+ ROUTES_FILE = os.path.join(INPUT_GTFS_PATH, "routes.txt")
+
+ # Build calendar.txt from calendar_dates.txt
+ # infer each service's weekly pattern from which actual dates it ran on.
+ # The "reference weekday" is the weekday date with the most active services
+ # (i.e. the most likely normal working day, avoiding holidays).
+ # Saturday and Sunday services are inferred from the Saturday/Sunday dates present.
+ CALENDAR_DATES_FILE = os.path.join(INPUT_GTFS_PATH, "calendar_dates.txt")
+
+ # service_id -> set of YYYYMMDD date strings (exception_type=1 only)
+ service_dates: dict[str, set[str]] = {}
+ for row in get_rows(CALENDAR_DATES_FILE):
+ if row.get("exception_type", "").strip() != "1":
+ continue
+ sid = row["service_id"].strip()
+ d = row["date"].strip()
+ service_dates.setdefault(sid, set()).add(d)
+
+ logging.debug(f"Found {len(service_dates)} service IDs in calendar_dates.txt")
+
+ def _parse_date(d: str) -> date:
+ return datetime.strptime(d, "%Y%m%d").date()
+
+ all_dates: set[str] = {d for dates in service_dates.values() for d in dates}
+
+ # Group dates by day-of-week (0=Mon … 6=Sun)
+ dates_by_dow: dict[int, list[str]] = {}
+ for d in all_dates:
+ dow = _parse_date(d).weekday()
+ dates_by_dow.setdefault(dow, []).append(d)
+
+ saturday_dates: set[str] = set(dates_by_dow.get(5, []))
+ sunday_dates: set[str] = set(dates_by_dow.get(6, []))
+ weekday_dates: set[str] = set()
+ for _dow in range(5):
+ weekday_dates.update(dates_by_dow.get(_dow, []))
+
+ # Pick the weekday date where the most services run (most "normal" working day).
+ # Days with fewer services than others are likely public holidays.
+ weekday_svc_counts: dict[str, int] = {
+ d: sum(1 for dates in service_dates.values() if d in dates)
+ for d in weekday_dates
+ }
+ if weekday_svc_counts:
+ ref_weekday = max(weekday_svc_counts, key=weekday_svc_counts.__getitem__)
+ logging.info(
+ f"Reference weekday: {ref_weekday} "
+ f"({_parse_date(ref_weekday).strftime('%A')}) "
+ f"with {weekday_svc_counts[ref_weekday]} active services"
+ )
+ else:
+ ref_weekday = None
+ logging.warning("No weekday dates found in calendar_dates.txt")
+
+ feed_start = min(_parse_date(d) for d in all_dates)
+ feed_end = feed_start + timedelta(days=365)
+
+ calendar_output_rows: list[dict] = []
+ for sid, dates in service_dates.items():
+ is_weekday = ref_weekday is not None and ref_weekday in dates
+ is_saturday = bool(dates & saturday_dates)
+ is_sunday = bool(dates & sunday_dates)
+
+ if not is_weekday and not is_saturday and not is_sunday:
+ logging.warning(f"Service {sid!r} has no day-type classification, skipping")
+ continue
+
+ wd = "1" if is_weekday else "0"
+ sat = "1" if is_saturday else "0"
+ sun = "1" if is_sunday else "0"
+ calendar_output_rows.append({
+ "service_id": sid,
+ "monday": wd,
+ "tuesday": wd,
+ "wednesday": wd,
+ "thursday": wd,
+ "friday": wd,
+ "saturday": sat,
+ "sunday": sun,
+ # 2 days before feed start, so feeds published early don't mess it up
+ "start_date": (feed_start - timedelta(days=2)).strftime("%Y%m%d"),
+ "end_date": feed_end.strftime("%Y%m%d"),
+ })
+
+ logging.info(f"Generated {len(calendar_output_rows)} calendar.txt entries")
+
+ # Copy every file in the feed except calendar_dates.txt / calendar.txt
+ # (we replace them with a freshly generated calendar.txt above)
+ for filename in os.listdir(INPUT_GTFS_PATH):
+ if not filename.endswith(".txt"):
+ continue
+ if filename in ("calendar_dates.txt", "calendar.txt"):
+ continue
+
+ src_path = os.path.join(INPUT_GTFS_PATH, filename)
+ dest_path = os.path.join(OUTPUT_GTFS_PATH, filename)
+ shutil.copy(src_path, dest_path)
+
+ CALENDAR_OUTPUT_FILE = os.path.join(OUTPUT_GTFS_PATH, "calendar.txt")
+ with open(CALENDAR_OUTPUT_FILE, "w", encoding="utf-8", newline="") as f:
+ fieldnames = [
+ "service_id", "monday", "tuesday", "wednesday", "thursday",
+ "friday", "saturday", "sunday", "start_date", "end_date",
+ ]
+ writer = csv.DictWriter(f, fieldnames=fieldnames)
+ writer.writeheader()
+ writer.writerows(calendar_output_rows)
+
+ # Create a ZIP archive of the output GTFS
+ with zipfile.ZipFile(OUTPUT_GTFS_ZIP, "w", zipfile.ZIP_DEFLATED) as zipf:
+ for root, _, files in os.walk(OUTPUT_GTFS_PATH):
+ for file in files:
+ file_path = os.path.join(root, file)
+ arcname = os.path.relpath(file_path, OUTPUT_GTFS_PATH)
+ zipf.write(file_path, arcname)
+
+ logging.info(
+ f"GTFS data from feed has been zipped successfully at {OUTPUT_GTFS_ZIP}."
+ )
+ os.close(INPUT_GTFS_FD)
+ os.remove(INPUT_GTFS_ZIP)
+ shutil.rmtree(INPUT_GTFS_PATH)
+ shutil.rmtree(OUTPUT_GTFS_PATH)