diff options
| author | Ariel Costas Guerrero <ariel@costas.dev> | 2025-11-17 16:39:48 +0100 |
|---|---|---|
| committer | Ariel Costas Guerrero <ariel@costas.dev> | 2025-11-17 16:39:48 +0100 |
| commit | ef86a09ee8d8fc287b382cf9092af8726b44ceae (patch) | |
| tree | 448b493d80a63137a9d42c0ca0cd2fa643e810e0 /src/delay_collector/main.py | |
| parent | 4b9c57dc6547d0c9d105ac3767dcc90da758a25d (diff) | |
Drop support for Santiago de Compostela, add collection script
Diffstat (limited to 'src/delay_collector/main.py')
| -rw-r--r-- | src/delay_collector/main.py | 218 |
1 files changed, 218 insertions, 0 deletions
diff --git a/src/delay_collector/main.py b/src/delay_collector/main.py new file mode 100644 index 0000000..cab6cf6 --- /dev/null +++ b/src/delay_collector/main.py @@ -0,0 +1,218 @@ +import os +import sys +from datetime import datetime +from time import sleep, time +from zoneinfo import ZoneInfo +import logging +from pathlib import Path + +import requests + +from database import insert_observations + + +STOP_CODES = [ + 14227, # (Torrecedeira 86) + 8460, # (Torrecedeira 105) + 20206, # (Marqués Valladares ft. 19) + 14264, # (Urzáiz-Príncipe) + 8770, # (Urzáiz, 13) + 5610, # (Gran Vía, 12) + 5660, # (Gran Vía, 19) + 6940, # (Praza América 3) + 2780, # (Camelias, 135) + 8630, # (Travesía 7) + 8610, # (Travesía 8) + 5410, # (Fragoso, 12) + 1360, # (Castrelos, 16) + 8040, # (Sanjurjo Badía, 167) + 14132, # (Sanjurjo Badía, 252) +] + +FREQUENCY_SECONDS = int(os.getenv("FREQUENCY_SECONDS", "30")) +SERVICE_START_HOUR = int(os.getenv("SERVICE_START_HOUR", "7")) # 7 AM +SERVICE_START_MINUTE = int(os.getenv("SERVICE_START_MINUTE", "00")) # 7 AM +SERVICE_END_HOUR = int(os.getenv("SERVICE_END_HOUR", "23")) # 11 PM +SERVICE_END_MINUTE = int(os.getenv("SERVICE_END_MINUTE", "00")) # 11:30 PM +SERVICE_TIMEZONE = os.getenv("SERVICE_TIMEZONE", "Europe/Madrid") + + +def setup_logging(): + """Configure logging for daemon operation.""" + # Configure logging to both file and console + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(sys.stdout) + ] + ) + return logging.getLogger(__name__) + + +def is_within_service_hours() -> bool: + """Check if current time is within configured service hours (Madrid time).""" + tz = ZoneInfo(SERVICE_TIMEZONE) + now = datetime.now(tz) + + current_hour = now.hour + current_minute = now.minute + + # Check if before start time + if current_hour < SERVICE_START_HOUR: + return False + if current_hour == SERVICE_START_HOUR and current_minute < SERVICE_START_MINUTE: + return False + + # Check if after end time + if current_hour > SERVICE_END_HOUR: + return False + if current_hour == SERVICE_END_HOUR and current_minute >= SERVICE_END_MINUTE: + return False + + return True + + +def wait_until_service_hours(logger): + """Wait until service hours begin.""" + tz = ZoneInfo(SERVICE_TIMEZONE) + + while not is_within_service_hours(): + now = datetime.now(tz) + logger.info( + f"Outside service hours (current time: {now.strftime('%H:%M %Z')}). Waiting...") + # Check every 5 minutes + sleep(300) + + +def download_consolidated_data(stop_code: int) -> list[dict]: + URL = f"https://busurbano.costas.dev/api/vigo/GetConsolidatedCirculations?stopId={stop_code}" + + response = requests.get(URL) + if response.status_code == 200: + return response.json() + else: + return [] + + +def get_consolidated_data(stop_code: int): + raw_data = download_consolidated_data(stop_code) + + processed_items = [] + for item in raw_data: + line = item.get("line") + route = item.get("route") + schedule = item.get("schedule", None) + real_time = item.get("realTime", None) + + # If either is missing, skip this item + if schedule is None or real_time is None: + continue + + processed_items.append( + { + "line": line, + "route": route, + "service_id": schedule.get("serviceId"), + "trip_id": schedule.get("tripId"), + "running": schedule.get("running", False), + "scheduled_minutes": schedule.get("minutes"), + "real_time_minutes": real_time.get("minutes"), + } + ) + + return processed_items + + +def main(): + """Main collection loop that continuously gathers and stores delay data.""" + # Setup logging + logger = setup_logging() + + # Log system time on startup + tz = ZoneInfo(SERVICE_TIMEZONE) + system_time = datetime.now(tz) + logger.info(f"=== Delay Collector Starting ===") + logger.info(f"System time: {system_time.strftime('%Y-%m-%d %H:%M:%S %Z')}") + logger.info( + f"Database: {os.getenv('DB_HOST', 'localhost')}:{os.getenv('DB_PORT', '5432')}/{os.getenv('DB_NAME', 'busurbano')}") + + # Calculate spacing between requests to evenly distribute them + # We want to complete all stops every FREQUENCY_SECONDS, so space them evenly + request_interval = FREQUENCY_SECONDS / len(STOP_CODES) + + logger.info(f"Monitoring {len(STOP_CODES)} stops") + logger.info(f"Collection cycle: {FREQUENCY_SECONDS} seconds (all stops)") + logger.info( + f"Request interval: {request_interval:.2f} seconds (between stops)") + logger.info( + f"Service hours: {SERVICE_START_HOUR}:00 - {SERVICE_END_HOUR}:{SERVICE_END_MINUTE:02d} {SERVICE_TIMEZONE}") + logger.info("Press Ctrl+C to stop\n") + + total_records = 0 + + try: + while True: + # Check if within service hours + if not is_within_service_hours(): + tz = ZoneInfo(SERVICE_TIMEZONE) + now = datetime.now(tz) + logger.info( + f"Outside service hours (current: {now.strftime('%H:%M %Z')}). Pausing collection.") + logger.info(f"Total records collected today: {total_records}") + + # Wait until service hours resume + wait_until_service_hours(logger) + logger.info("Service hours resumed. Resuming collection...") + # Reset daily counter + total_records = 0 + continue + + cycle_start = time() + + # Collect from each stop, evenly spaced + for stop_code in STOP_CODES: + request_start = time() + + try: + # Capture the exact time of observation + observed_at = datetime.now(ZoneInfo('UTC')) + + # Fetch and process data + data = get_consolidated_data(stop_code) + + # Store in database + if data: + records_inserted = insert_observations( + data, stop_code, observed_at) + total_records += records_inserted + logger.info( + f"Stop {stop_code}: {records_inserted} observations (Total today: {total_records})") + else: + logger.debug(f"Stop {stop_code}: No observations") + + except Exception as e: + logger.error(f"Error processing stop {stop_code}: {e}") + + # Sleep to maintain even spacing between requests + request_elapsed = time() - request_start + sleep_time = request_interval - request_elapsed + if sleep_time > 0: + sleep(sleep_time) + + # After completing all stops, wait if we finished early to maintain the cycle time + cycle_elapsed = time() - cycle_start + remaining_time = FREQUENCY_SECONDS - cycle_elapsed + if remaining_time > 0: + logger.debug( + f"Cycle completed in {cycle_elapsed:.1f}s, waiting {remaining_time:.1f}s") + sleep(remaining_time) + + except KeyboardInterrupt: + logger.info("\n=== Collection stopped by user ===") + logger.info(f"Total records collected: {total_records}") + sys.exit(0) + + +if __name__ == "__main__": + main() |
