From b96273b54a9b47c79e0afe40a918f751e82097ae Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Sat, 22 Nov 2025 18:02:36 +0100 Subject: Link previous trip shapes for GPS positioning on circular routes (#111) Co-authored-by: Ariel Costas Guerrero Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: arielcostas <94913521+arielcostas@users.noreply.github.com> --- src/gtfs_vigo_stops/stop_report.py | 138 ++++++++++++++++++++++++++++++++++++- 1 file changed, 135 insertions(+), 3 deletions(-) (limited to 'src/gtfs_vigo_stops/stop_report.py') diff --git a/src/gtfs_vigo_stops/stop_report.py b/src/gtfs_vigo_stops/stop_report.py index a827eaa..cee11ea 100644 --- a/src/gtfs_vigo_stops/stop_report.py +++ b/src/gtfs_vigo_stops/stop_report.py @@ -4,7 +4,7 @@ import shutil import sys import time import traceback -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional, Tuple from src.shapes import process_shapes from src.common import get_all_feed_dates @@ -13,10 +13,10 @@ from src.logger import get_logger from src.report_writer import write_stop_json, write_stop_protobuf from src.routes import load_routes from src.services import get_active_services -from src.stop_times import get_stops_for_trips +from src.stop_times import get_stops_for_trips, StopTime from src.stops import get_all_stops, get_all_stops_by_code, get_numeric_code from src.street_name import get_street_name, normalise_stop_name -from src.trips import get_trips_for_services +from src.trips import get_trips_for_services, TripLine logger = get_logger("stop_report") @@ -143,6 +143,130 @@ def is_next_day_service(time_str: str) -> bool: return False +def parse_trip_id_components(trip_id: str) -> Optional[Tuple[str, str, int]]: + """ + Parse a trip ID in format XXXYYY-Z or XXXYYY_Z where: + - XXX = line number (e.g., 003) + - YYY = shift/internal ID (e.g., 001) + - Z = trip number (e.g., 12) + + Supported formats: + 1. ..._XXXYYY_Z (e.g. "C1 01SNA00_001001_18") + 2. ..._XXXYYY-Z (e.g. "VIGO_20241122_003001-12") + + Returns tuple of (line, shift_id, trip_number) or None if parsing fails. + """ + try: + parts = trip_id.split("_") + if len(parts) < 2: + return None + + # Try format 1: ..._XXXYYY_Z + # Check if second to last part is 6 digits (XXXYYY) and last part is numeric + if len(parts) >= 2: + shift_part = parts[-2] + trip_num_str = parts[-1] + + if len(shift_part) == 6 and shift_part.isdigit() and trip_num_str.isdigit(): + line = shift_part[:3] + shift_id = shift_part[3:6] + trip_number = int(trip_num_str) + return (line, shift_id, trip_number) + + # Try format 2: ..._XXXYYY-Z + # The trip ID is the last part in format XXXYYY-Z + trip_part = parts[-1] + + if "-" in trip_part: + shift_part, trip_num_str = trip_part.split("-", 1) + + # shift_part should be 6 digits: XXXYYY + if len(shift_part) == 6 and shift_part.isdigit(): + line = shift_part[:3] # First 3 digits + shift_id = shift_part[3:6] # Next 3 digits + trip_number = int(trip_num_str) + return (line, shift_id, trip_number) + + return None + except (ValueError, IndexError): + return None + + +def build_trip_previous_shape_map( + trips: Dict[str, List[TripLine]], + stops_for_all_trips: Dict[str, List[StopTime]], +) -> Dict[str, Optional[str]]: + """ + Build a mapping from trip_id to previous_trip_shape_id. + + Links trips based on trip ID structure (XXXYYY-Z) where trips with the same + XXX (line) and YYY (shift) and sequential Z (trip numbers) are connected + if the terminus of trip N matches the start of trip N+1. + + Args: + trips: Dictionary of service_id -> list of trips + stops_for_all_trips: Dictionary of trip_id -> list of stop times + + Returns: + Dictionary mapping trip_id to previous_trip_shape_id (or None) + """ + trip_previous_shape: Dict[str, Optional[str]] = {} + + # Collect all trips across all services + all_trips_list: List[TripLine] = [] + for trip_list in trips.values(): + all_trips_list.extend(trip_list) + + # Group trips by shift ID (line + shift combination) + trips_by_shift: Dict[str, List[Tuple[TripLine, int, str, str]]] = {} + + for trip in all_trips_list: + parsed = parse_trip_id_components(trip.trip_id) + if not parsed: + continue + + line, shift_id, trip_number = parsed + shift_key = f"{line}{shift_id}" + + trip_stops = stops_for_all_trips.get(trip.trip_id) + if not trip_stops or len(trip_stops) < 2: + continue + + first_stop = trip_stops[0] + last_stop = trip_stops[-1] + + if shift_key not in trips_by_shift: + trips_by_shift[shift_key] = [] + + trips_by_shift[shift_key].append(( + trip, + trip_number, + first_stop.stop_id, + last_stop.stop_id + )) + + # For each shift, sort trips by trip number and link consecutive trips + for shift_key, shift_trips in trips_by_shift.items(): + # Sort by trip number + shift_trips.sort(key=lambda x: x[1]) + + # Link consecutive trips if their stops match + for i in range(1, len(shift_trips)): + current_trip, current_num, current_start_stop, _ = shift_trips[i] + prev_trip, prev_num, _, prev_end_stop = shift_trips[i - 1] + + # Check if trips are consecutive (trip numbers differ by 1), + # if previous trip's terminus matches current trip's start, + # and if both trips have valid shape IDs + if (current_num == prev_num + 1 and + prev_end_stop == current_start_stop and + prev_trip.shape_id and + current_trip.shape_id): + trip_previous_shape[current_trip.trip_id] = prev_trip.shape_id + + return trip_previous_shape + + def get_stop_arrivals(feed_dir: str, date: str) -> Dict[str, List[Dict[str, Any]]]: """ Process trips for the given date and organize stop arrivals. @@ -192,6 +316,10 @@ def get_stop_arrivals(feed_dir: str, date: str) -> Dict[str, List[Dict[str, Any] stops_for_all_trips = get_stops_for_trips(feed_dir, all_trip_ids) logger.info(f"Precomputed stops for {len(stops_for_all_trips)} trips.") + # Build mapping from trip_id to previous trip's shape_id + trip_previous_shape_map = build_trip_previous_shape_map(trips, stops_for_all_trips) + logger.info(f"Built previous trip shape mapping for {len(trip_previous_shape_map)} trips.") + # Load routes information routes = load_routes(feed_dir) logger.info(f"Loaded {len(routes)} routes from feed.") @@ -335,6 +463,9 @@ def get_stop_arrivals(feed_dir: str, date: str) -> Dict[str, List[Dict[str, Any] next_streets = [] trip_id_fmt = "_".join(trip_id.split("_")[1:3]) + + # Get previous trip shape_id if available + previous_trip_shape_id = trip_previous_shape_map.get(trip_id, "") stop_arrivals[stop_code].append( { @@ -356,6 +487,7 @@ def get_stop_arrivals(feed_dir: str, date: str) -> Dict[str, List[Dict[str, Any] "terminus_code": terminus_code, "terminus_name": terminus_name, "terminus_time": final_terminus_time, + "previous_trip_shape_id": previous_trip_shape_id, } ) -- cgit v1.3