aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/gtfs_vigo_stops/stop_report.py203
1 files changed, 132 insertions, 71 deletions
diff --git a/src/gtfs_vigo_stops/stop_report.py b/src/gtfs_vigo_stops/stop_report.py
index da3a5d7..a827eaa 100644
--- a/src/gtfs_vigo_stops/stop_report.py
+++ b/src/gtfs_vigo_stops/stop_report.py
@@ -31,7 +31,8 @@ def parse_args():
default="./output/",
help="Directory to write reports to (default: ./output/)",
)
- parser.add_argument("--feed-dir", type=str, help="Path to the feed directory")
+ parser.add_argument("--feed-dir", type=str,
+ help="Path to the feed directory")
parser.add_argument(
"--feed-url",
type=str,
@@ -78,20 +79,20 @@ def normalize_gtfs_time(time_str: str) -> str:
"""
Normalize GTFS time format to standard HH:MM:SS (0-23 hours).
Converts times like 25:30:00 to 01:30:00.
-
+
Args:
time_str: Time in HH:MM:SS format, possibly with hours >= 24
-
+
Returns:
Normalized time string in HH:MM:SS format
"""
if not time_str:
return time_str
-
+
parts = time_str.split(":")
if len(parts) != 3:
return time_str
-
+
try:
hours, minutes, seconds = map(int, parts)
normalized_hours = hours % 24
@@ -100,23 +101,41 @@ def normalize_gtfs_time(time_str: str) -> str:
return time_str
+def format_gtfs_time(time_str: str) -> str:
+ """
+ Format GTFS time to HH:MM:SS, preserving hours >= 24.
+ """
+ if not time_str:
+ return time_str
+
+ parts = time_str.split(":")
+ if len(parts) != 3:
+ return time_str
+
+ try:
+ hours, minutes, seconds = map(int, parts)
+ return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
+ except ValueError:
+ return time_str
+
+
def is_next_day_service(time_str: str) -> bool:
"""
Check if a GTFS time represents a service on the next day (hours >= 24).
-
+
Args:
time_str: Time in HH:MM:SS format
-
+
Returns:
True if the time is >= 24:00:00, False otherwise
"""
if not time_str:
return False
-
+
parts = time_str.split(":")
if len(parts) != 3:
return False
-
+
try:
hours = int(parts[0])
return hours >= 24
@@ -137,23 +156,26 @@ def get_stop_arrivals(feed_dir: str, date: str) -> Dict[str, List[Dict[str, Any]
Dictionary mapping stop_code to lists of arrival information.
"""
from datetime import datetime, timedelta
-
+
stops = get_all_stops(feed_dir)
logger.info(f"Found {len(stops)} stops in the feed.")
active_services = get_active_services(feed_dir, date)
if not active_services:
logger.info("No active services found for the given date.")
-
- logger.info(f"Found {len(active_services)} active services for date {date}.")
-
+
+ logger.info(
+ f"Found {len(active_services)} active services for date {date}.")
+
# Also get services from the previous day to include night services (times >= 24:00)
- prev_date = (datetime.strptime(date, "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d")
+ prev_date = (datetime.strptime(date, "%Y-%m-%d") -
+ timedelta(days=1)).strftime("%Y-%m-%d")
prev_services = get_active_services(feed_dir, prev_date)
- logger.info(f"Found {len(prev_services)} active services for previous date {prev_date} (for night services).")
-
+ logger.info(
+ f"Found {len(prev_services)} active services for previous date {prev_date} (for night services).")
+
all_services = list(set(active_services + prev_services))
-
+
if not all_services:
logger.info("No active services found for current or previous date.")
return {}
@@ -163,7 +185,8 @@ def get_stop_arrivals(feed_dir: str, date: str) -> Dict[str, List[Dict[str, Any]
logger.info(f"Found {total_trip_count} trips for active services.")
# Get all trip IDs
- all_trip_ids = [trip.trip_id for trip_list in trips.values() for trip in trip_list]
+ all_trip_ids = [trip.trip_id for trip_list in trips.values()
+ for trip in trip_list]
# Get stops for all trips
stops_for_all_trips = get_stops_for_trips(feed_dir, all_trip_ids)
@@ -182,10 +205,16 @@ def get_stop_arrivals(feed_dir: str, date: str) -> Dict[str, List[Dict[str, Any]
# Organize data by stop_code
stop_arrivals = {}
+ active_services_set = set(active_services)
+ prev_services_set = set(prev_services)
+
for service_id, trip_list in trips.items():
- # Determine if this service is from the previous day
- is_prev_day_service = service_id in prev_services and service_id not in active_services
-
+ is_active = service_id in active_services_set
+ is_prev = service_id in prev_services_set
+
+ if not is_active and not is_prev:
+ continue
+
for trip in trip_list:
# Get route information once per trip
route_info = routes.get(trip.route_id, {})
@@ -222,7 +251,8 @@ def get_stop_arrivals(feed_dir: str, date: str) -> Dict[str, List[Dict[str, Any]
stop_to_segment_idx.append(len(segment_names) - 1)
# Precompute future street transitions per segment
- future_suffix_by_segment: list[tuple[str, ...]] = [()] * len(segment_names)
+ future_suffix_by_segment: list[tuple[str, ...]] = [
+ ()] * len(segment_names)
future_tuple: tuple[str, ...] = ()
for idx in range(len(segment_names) - 1, -1, -1):
future_suffix_by_segment[idx] = future_tuple
@@ -241,64 +271,93 @@ def get_stop_arrivals(feed_dir: str, date: str) -> Dict[str, List[Dict[str, Any]
starting_stop_name = first_stop.stop_name if first_stop else "Unknown Stop"
terminus_stop_name = last_stop.stop_name if last_stop else "Unknown Stop"
- starting_code = get_numeric_code(first_stop.stop_code) if first_stop else ""
- terminus_code = get_numeric_code(last_stop.stop_code) if last_stop else ""
+ starting_code = get_numeric_code(
+ first_stop.stop_code) if first_stop else ""
+ terminus_code = get_numeric_code(
+ last_stop.stop_code) if last_stop else ""
starting_name = normalise_stop_name(starting_stop_name)
terminus_name = normalise_stop_name(terminus_stop_name)
starting_time = first_stop_time.departure_time
terminus_time = last_stop_time.arrival_time
- for i, (stop_time, _) in enumerate(trip_stop_pairs):
- stop_code = stop_id_to_code.get(stop_time.stop_id)
+ # Determine processing passes for this trip
+ passes = []
+ if is_active:
+ passes.append("current")
+ if is_prev:
+ passes.append("previous")
- if not stop_code:
- continue # Skip stops without a code
-
- # Filter based on whether this is from previous day's service
- # For previous day services: only include if calling_time >= 24:00:00 (night services rolling to this day)
- # For current day services: include ALL times (both < 24:00 and >= 24:00)
- if is_prev_day_service:
- if not is_next_day_service(stop_time.departure_time):
- continue # Skip times < 24:00 from previous day
+ for mode in passes:
+ is_current_mode = (mode == "current")
- if stop_code not in stop_arrivals:
- stop_arrivals[stop_code] = []
+ for i, (stop_time, _) in enumerate(trip_stop_pairs):
+ stop_code = stop_id_to_code.get(stop_time.stop_id)
- if segment_names:
- segment_idx = stop_to_segment_idx[i]
- if segment_idx not in segment_future_lists:
- segment_future_lists[segment_idx] = list(
- future_suffix_by_segment[segment_idx]
- )
- next_streets = segment_future_lists[segment_idx].copy()
- else:
- next_streets = []
+ if not stop_code:
+ continue # Skip stops without a code
- trip_id_fmt = "_".join(trip_id.split("_")[1:3])
+ dep_time = stop_time.departure_time
- stop_arrivals[stop_code].append(
- {
- "service_id": service_id.split("_")[1],
- "trip_id": trip_id_fmt,
- "line": route_short_name,
- "route": trip_headsign,
- "shape_id": getattr(trip, "shape_id", ""),
- "stop_sequence": stop_time.stop_sequence,
- "shape_dist_traveled": getattr(
- stop_time, "shape_dist_traveled", 0
- ),
- "next_streets": next_streets,
- "starting_code": starting_code,
- "starting_name": starting_name,
- "starting_time": normalize_gtfs_time(starting_time),
- "calling_time": normalize_gtfs_time(stop_time.departure_time),
- "calling_ssm": time_to_seconds(normalize_gtfs_time(stop_time.departure_time)),
- "terminus_code": terminus_code,
- "terminus_name": terminus_name,
- "terminus_time": normalize_gtfs_time(terminus_time),
- }
- )
+ if not is_current_mode:
+ # Previous day service: only include if calling_time >= 24:00:00 (night services rolling to this day)
+ if not is_next_day_service(dep_time):
+ continue
+
+ # Normalize times for display on current day (e.g. 25:30 -> 01:30)
+ final_starting_time = normalize_gtfs_time(
+ starting_time)
+ final_calling_time = normalize_gtfs_time(dep_time)
+ final_terminus_time = normalize_gtfs_time(
+ terminus_time)
+ # SSM should be small (early morning)
+ final_calling_ssm = time_to_seconds(final_calling_time)
+ else:
+ # Current day service: include ALL times
+ # Keep times as is (e.g. 25:30 stays 25:30)
+ final_starting_time = format_gtfs_time(starting_time)
+ final_calling_time = format_gtfs_time(dep_time)
+ final_terminus_time = format_gtfs_time(terminus_time)
+ # SSM should be large if > 24:00
+ final_calling_ssm = time_to_seconds(dep_time)
+
+ if stop_code not in stop_arrivals:
+ stop_arrivals[stop_code] = []
+
+ if segment_names:
+ segment_idx = stop_to_segment_idx[i]
+ if segment_idx not in segment_future_lists:
+ segment_future_lists[segment_idx] = list(
+ future_suffix_by_segment[segment_idx]
+ )
+ next_streets = segment_future_lists[segment_idx].copy()
+ else:
+ next_streets = []
+
+ trip_id_fmt = "_".join(trip_id.split("_")[1:3])
+
+ stop_arrivals[stop_code].append(
+ {
+ "service_id": service_id.split("_")[1],
+ "trip_id": trip_id_fmt,
+ "line": route_short_name,
+ "route": trip_headsign,
+ "shape_id": getattr(trip, "shape_id", ""),
+ "stop_sequence": stop_time.stop_sequence,
+ "shape_dist_traveled": getattr(
+ stop_time, "shape_dist_traveled", 0
+ ),
+ "next_streets": next_streets,
+ "starting_code": starting_code,
+ "starting_name": starting_name,
+ "starting_time": final_starting_time,
+ "calling_time": final_calling_time,
+ "calling_ssm": final_calling_ssm,
+ "terminus_code": terminus_code,
+ "terminus_name": terminus_name,
+ "terminus_time": final_terminus_time,
+ }
+ )
# Sort each stop's arrivals by arrival time
for stop_code in stop_arrivals:
@@ -388,7 +447,8 @@ def main():
feed_dir = args.feed_dir
else:
logger.info(f"Downloading GTFS feed from {feed_url}...")
- feed_dir = download_feed_from_url(feed_url, output_dir, args.force_download)
+ feed_dir = download_feed_from_url(
+ feed_url, output_dir, args.force_download)
if feed_dir is None:
logger.info("Download was skipped (feed not modified). Exiting.")
return
@@ -413,7 +473,8 @@ def main():
_, stop_summary = process_date(feed_dir, date, output_dir)
all_stops_summary[date] = stop_summary
- logger.info("Finished processing all dates. Beginning with shape transformation.")
+ logger.info(
+ "Finished processing all dates. Beginning with shape transformation.")
# Process shapes, converting each coordinate to EPSG:25829 and saving as Protobuf
process_shapes(feed_dir, output_dir)