koleo-izochrona/backend/gtfs_loader.py

119 lines
4.7 KiB
Python

"""
Moduł do ładowania i przetwarzania danych GTFS
"""
import gtfs_kit as gk
import pandas as pd
import networkx as nx
from pathlib import Path
from datetime import datetime, timedelta
class GTFSLoader:
"""Klasa do ładowania i parsowania danych GTFS"""
def __init__(self, gtfs_path):
"""
Args:
gtfs_path: ścieżka do pliku ZIP z danymi GTFS
"""
print(f'📦 Ładowanie danych GTFS z {gtfs_path}...')
print(' To może potrwać 1-2 minuty dla dużego pliku...')
self.feed = gk.read_feed(gtfs_path, dist_units='km')
print(f'✓ Załadowano {len(self.feed.stops)} przystanków')
print(f'✓ Znaleziono {len(self.feed.routes)} linii')
print(f'✓ Znaleziono {len(self.feed.trips)} kursów')
def get_stops(self):
"""Zwraca DataFrame ze wszystkimi przystankami"""
return self.feed.stops[['stop_id', 'stop_name', 'stop_lat', 'stop_lon']]
def build_route_graph(self, date=None, start_time='06:00:00', end_time='22:00:00'):
"""
Buduje graf połączeń kolejowych dla określonej daty i przedziału czasowego
Args:
date: data w formacie YYYYMMDD (domyślnie: dzisiaj)
start_time: początek przedziału czasowego
end_time: koniec przedziału czasowego
Returns:
NetworkX DiGraph z wagami jako czasy podróży w minutach
"""
if date is None:
date = datetime.now().strftime('%Y%m%d')
print(f'🔨 Budowanie grafu dla daty {date}, godz. {start_time}-{end_time}...')
print(' Może to potrwać 2-3 minuty...')
# Filtruj trips dla wybranej daty
print(' 1/5 Kopiowanie danych trips...')
trips = self.feed.trips.copy()
stop_times = self.feed.stop_times.copy()
# Połącz trips ze stop_times
print(' 2/5 Łączenie trips ze stop_times...')
stop_times = stop_times.merge(trips[['trip_id', 'route_id']], on='trip_id')
# Filtruj po czasie
print(' 3/5 Filtrowanie po czasie...')
stop_times = stop_times[
(stop_times['arrival_time'] >= start_time) &
(stop_times['departure_time'] <= end_time)
]
# Sortuj po trip_id i stop_sequence
print(' 4/5 Sortowanie...')
stop_times = stop_times.sort_values(['trip_id', 'stop_sequence'])
# Buduj graf
print(' 5/5 Budowanie grafu połączeń...')
G = nx.DiGraph()
# Dodaj wszystkie przystanki jako węzły
total_stops = len(self.feed.stops)
print(f' Dodawanie {total_stops} przystanków...')
for _, stop in self.feed.stops.iterrows():
G.add_node(stop['stop_id'],
name=stop['stop_name'],
lat=stop['stop_lat'],
lon=stop['stop_lon'])
# Dodaj krawędzie między kolejnymi przystankami w trasie
grouped = list(stop_times.groupby('trip_id'))
total_trips = len(grouped)
print(f' Przetwarzanie {total_trips} kursów...')
for i, (trip_id, group) in enumerate(grouped):
stops = group.sort_values('stop_sequence')
for i in range(len(stops) - 1):
from_stop = stops.iloc[i]
to_stop = stops.iloc[i + 1]
# Oblicz czas podróży w minutach
from_time = pd.to_datetime(from_stop['departure_time'])
to_time = pd.to_datetime(to_stop['arrival_time'])
travel_time = (to_time - from_time).total_seconds() / 60
if travel_time < 0:
travel_time += 24 * 60 # Następny dzień
# Dodaj lub zaktualizuj krawędź (wybierz najszybsze połączenie)
if G.has_edge(from_stop['stop_id'], to_stop['stop_id']):
current_time = G[from_stop['stop_id']][to_stop['stop_id']]['weight']
if travel_time < current_time:
G[from_stop['stop_id']][to_stop['stop_id']]['weight'] = travel_time
else:
G.add_edge(from_stop['stop_id'], to_stop['stop_id'],
weight=travel_time)
# Progress indicator co 100 kursów
if (i + 1) % 100 == 0:
print(f' Przetworzono {i + 1}/{total_trips} kursów...')
print(f'✓ Graf zbudowany: {G.number_of_nodes()} węzłów, {G.number_of_edges()} krawędzi')
return G
def find_stops_by_name(self, name_fragment):
"""Znajduje przystanki po fragmencie nazwy"""
stops = self.get_stops()
mask = stops['stop_name'].str.contains(name_fragment, case=False, na=False)
return stops[mask]