diff options
Diffstat (limited to 'src/gtfs_vigo_stops/stop_report.py')
| -rw-r--r-- | src/gtfs_vigo_stops/stop_report.py | 203 |
1 files changed, 132 insertions, 71 deletions
diff --git a/src/gtfs_vigo_stops/stop_report.py b/src/gtfs_vigo_stops/stop_report.py index da3a5d7..a827eaa 100644 --- a/src/gtfs_vigo_stops/stop_report.py +++ b/src/gtfs_vigo_stops/stop_report.py @@ -31,7 +31,8 @@ def parse_args(): default="./output/", help="Directory to write reports to (default: ./output/)", ) - parser.add_argument("--feed-dir", type=str, help="Path to the feed directory") + parser.add_argument("--feed-dir", type=str, + help="Path to the feed directory") parser.add_argument( "--feed-url", type=str, @@ -78,20 +79,20 @@ def normalize_gtfs_time(time_str: str) -> str: """ Normalize GTFS time format to standard HH:MM:SS (0-23 hours). Converts times like 25:30:00 to 01:30:00. - + Args: time_str: Time in HH:MM:SS format, possibly with hours >= 24 - + Returns: Normalized time string in HH:MM:SS format """ if not time_str: return time_str - + parts = time_str.split(":") if len(parts) != 3: return time_str - + try: hours, minutes, seconds = map(int, parts) normalized_hours = hours % 24 @@ -100,23 +101,41 @@ def normalize_gtfs_time(time_str: str) -> str: return time_str +def format_gtfs_time(time_str: str) -> str: + """ + Format GTFS time to HH:MM:SS, preserving hours >= 24. + """ + if not time_str: + return time_str + + parts = time_str.split(":") + if len(parts) != 3: + return time_str + + try: + hours, minutes, seconds = map(int, parts) + return f"{hours:02d}:{minutes:02d}:{seconds:02d}" + except ValueError: + return time_str + + def is_next_day_service(time_str: str) -> bool: """ Check if a GTFS time represents a service on the next day (hours >= 24). - + Args: time_str: Time in HH:MM:SS format - + Returns: True if the time is >= 24:00:00, False otherwise """ if not time_str: return False - + parts = time_str.split(":") if len(parts) != 3: return False - + try: hours = int(parts[0]) return hours >= 24 @@ -137,23 +156,26 @@ def get_stop_arrivals(feed_dir: str, date: str) -> Dict[str, List[Dict[str, Any] Dictionary mapping stop_code to lists of arrival information. """ from datetime import datetime, timedelta - + stops = get_all_stops(feed_dir) logger.info(f"Found {len(stops)} stops in the feed.") active_services = get_active_services(feed_dir, date) if not active_services: logger.info("No active services found for the given date.") - - logger.info(f"Found {len(active_services)} active services for date {date}.") - + + logger.info( + f"Found {len(active_services)} active services for date {date}.") + # Also get services from the previous day to include night services (times >= 24:00) - prev_date = (datetime.strptime(date, "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d") + prev_date = (datetime.strptime(date, "%Y-%m-%d") - + timedelta(days=1)).strftime("%Y-%m-%d") prev_services = get_active_services(feed_dir, prev_date) - logger.info(f"Found {len(prev_services)} active services for previous date {prev_date} (for night services).") - + logger.info( + f"Found {len(prev_services)} active services for previous date {prev_date} (for night services).") + all_services = list(set(active_services + prev_services)) - + if not all_services: logger.info("No active services found for current or previous date.") return {} @@ -163,7 +185,8 @@ def get_stop_arrivals(feed_dir: str, date: str) -> Dict[str, List[Dict[str, Any] logger.info(f"Found {total_trip_count} trips for active services.") # Get all trip IDs - all_trip_ids = [trip.trip_id for trip_list in trips.values() for trip in trip_list] + all_trip_ids = [trip.trip_id for trip_list in trips.values() + for trip in trip_list] # Get stops for all trips stops_for_all_trips = get_stops_for_trips(feed_dir, all_trip_ids) @@ -182,10 +205,16 @@ def get_stop_arrivals(feed_dir: str, date: str) -> Dict[str, List[Dict[str, Any] # Organize data by stop_code stop_arrivals = {} + active_services_set = set(active_services) + prev_services_set = set(prev_services) + for service_id, trip_list in trips.items(): - # Determine if this service is from the previous day - is_prev_day_service = service_id in prev_services and service_id not in active_services - + is_active = service_id in active_services_set + is_prev = service_id in prev_services_set + + if not is_active and not is_prev: + continue + for trip in trip_list: # Get route information once per trip route_info = routes.get(trip.route_id, {}) @@ -222,7 +251,8 @@ def get_stop_arrivals(feed_dir: str, date: str) -> Dict[str, List[Dict[str, Any] stop_to_segment_idx.append(len(segment_names) - 1) # Precompute future street transitions per segment - future_suffix_by_segment: list[tuple[str, ...]] = [()] * len(segment_names) + future_suffix_by_segment: list[tuple[str, ...]] = [ + ()] * len(segment_names) future_tuple: tuple[str, ...] = () for idx in range(len(segment_names) - 1, -1, -1): future_suffix_by_segment[idx] = future_tuple @@ -241,64 +271,93 @@ def get_stop_arrivals(feed_dir: str, date: str) -> Dict[str, List[Dict[str, Any] starting_stop_name = first_stop.stop_name if first_stop else "Unknown Stop" terminus_stop_name = last_stop.stop_name if last_stop else "Unknown Stop" - starting_code = get_numeric_code(first_stop.stop_code) if first_stop else "" - terminus_code = get_numeric_code(last_stop.stop_code) if last_stop else "" + starting_code = get_numeric_code( + first_stop.stop_code) if first_stop else "" + terminus_code = get_numeric_code( + last_stop.stop_code) if last_stop else "" starting_name = normalise_stop_name(starting_stop_name) terminus_name = normalise_stop_name(terminus_stop_name) starting_time = first_stop_time.departure_time terminus_time = last_stop_time.arrival_time - for i, (stop_time, _) in enumerate(trip_stop_pairs): - stop_code = stop_id_to_code.get(stop_time.stop_id) + # Determine processing passes for this trip + passes = [] + if is_active: + passes.append("current") + if is_prev: + passes.append("previous") - if not stop_code: - continue # Skip stops without a code - - # Filter based on whether this is from previous day's service - # For previous day services: only include if calling_time >= 24:00:00 (night services rolling to this day) - # For current day services: include ALL times (both < 24:00 and >= 24:00) - if is_prev_day_service: - if not is_next_day_service(stop_time.departure_time): - continue # Skip times < 24:00 from previous day + for mode in passes: + is_current_mode = (mode == "current") - if stop_code not in stop_arrivals: - stop_arrivals[stop_code] = [] + for i, (stop_time, _) in enumerate(trip_stop_pairs): + stop_code = stop_id_to_code.get(stop_time.stop_id) - if segment_names: - segment_idx = stop_to_segment_idx[i] - if segment_idx not in segment_future_lists: - segment_future_lists[segment_idx] = list( - future_suffix_by_segment[segment_idx] - ) - next_streets = segment_future_lists[segment_idx].copy() - else: - next_streets = [] + if not stop_code: + continue # Skip stops without a code - trip_id_fmt = "_".join(trip_id.split("_")[1:3]) + dep_time = stop_time.departure_time - stop_arrivals[stop_code].append( - { - "service_id": service_id.split("_")[1], - "trip_id": trip_id_fmt, - "line": route_short_name, - "route": trip_headsign, - "shape_id": getattr(trip, "shape_id", ""), - "stop_sequence": stop_time.stop_sequence, - "shape_dist_traveled": getattr( - stop_time, "shape_dist_traveled", 0 - ), - "next_streets": next_streets, - "starting_code": starting_code, - "starting_name": starting_name, - "starting_time": normalize_gtfs_time(starting_time), - "calling_time": normalize_gtfs_time(stop_time.departure_time), - "calling_ssm": time_to_seconds(normalize_gtfs_time(stop_time.departure_time)), - "terminus_code": terminus_code, - "terminus_name": terminus_name, - "terminus_time": normalize_gtfs_time(terminus_time), - } - ) + if not is_current_mode: + # Previous day service: only include if calling_time >= 24:00:00 (night services rolling to this day) + if not is_next_day_service(dep_time): + continue + + # Normalize times for display on current day (e.g. 25:30 -> 01:30) + final_starting_time = normalize_gtfs_time( + starting_time) + final_calling_time = normalize_gtfs_time(dep_time) + final_terminus_time = normalize_gtfs_time( + terminus_time) + # SSM should be small (early morning) + final_calling_ssm = time_to_seconds(final_calling_time) + else: + # Current day service: include ALL times + # Keep times as is (e.g. 25:30 stays 25:30) + final_starting_time = format_gtfs_time(starting_time) + final_calling_time = format_gtfs_time(dep_time) + final_terminus_time = format_gtfs_time(terminus_time) + # SSM should be large if > 24:00 + final_calling_ssm = time_to_seconds(dep_time) + + if stop_code not in stop_arrivals: + stop_arrivals[stop_code] = [] + + if segment_names: + segment_idx = stop_to_segment_idx[i] + if segment_idx not in segment_future_lists: + segment_future_lists[segment_idx] = list( + future_suffix_by_segment[segment_idx] + ) + next_streets = segment_future_lists[segment_idx].copy() + else: + next_streets = [] + + trip_id_fmt = "_".join(trip_id.split("_")[1:3]) + + stop_arrivals[stop_code].append( + { + "service_id": service_id.split("_")[1], + "trip_id": trip_id_fmt, + "line": route_short_name, + "route": trip_headsign, + "shape_id": getattr(trip, "shape_id", ""), + "stop_sequence": stop_time.stop_sequence, + "shape_dist_traveled": getattr( + stop_time, "shape_dist_traveled", 0 + ), + "next_streets": next_streets, + "starting_code": starting_code, + "starting_name": starting_name, + "starting_time": final_starting_time, + "calling_time": final_calling_time, + "calling_ssm": final_calling_ssm, + "terminus_code": terminus_code, + "terminus_name": terminus_name, + "terminus_time": final_terminus_time, + } + ) # Sort each stop's arrivals by arrival time for stop_code in stop_arrivals: @@ -388,7 +447,8 @@ def main(): feed_dir = args.feed_dir else: logger.info(f"Downloading GTFS feed from {feed_url}...") - feed_dir = download_feed_from_url(feed_url, output_dir, args.force_download) + feed_dir = download_feed_from_url( + feed_url, output_dir, args.force_download) if feed_dir is None: logger.info("Download was skipped (feed not modified). Exiting.") return @@ -413,7 +473,8 @@ def main(): _, stop_summary = process_date(feed_dir, date, output_dir) all_stops_summary[date] = stop_summary - logger.info("Finished processing all dates. Beginning with shape transformation.") + logger.info( + "Finished processing all dates. Beginning with shape transformation.") # Process shapes, converting each coordinate to EPSG:25829 and saving as Protobuf process_shapes(feed_dir, output_dir) |
