""" Moduł do obliczania izochron dla połączeń kolejowych """ import networkx as nx from shapely.geometry import Point, MultiPoint class IsochroneCalculator: """Klasa do obliczania i generowania izochron""" def __init__(self, graph): """ Args: graph: NetworkX DiGraph z wagami jako czasy podróży """ self.graph = graph def calculate_travel_times(self, origin_stop_id, max_time=120): """ Oblicza czasy podróży z punktu startowego do wszystkich osiągalnych przystanków Args: origin_stop_id: ID przystanku początkowego max_time: maksymalny czas podróży w minutach Returns: dict: {stop_id: travel_time_in_minutes} """ if origin_stop_id not in self.graph: raise ValueError(f'Przystanek {origin_stop_id} nie istnieje w grafie') # Dijkstra - najkrótsze ścieżki try: lengths = nx.single_source_dijkstra_path_length( self.graph, origin_stop_id, cutoff=max_time, weight='weight' ) return lengths except nx.NodeNotFound: return {} def create_isochrones(self, origin_stop_id, time_intervals=[30, 60, 90, 120]): """ Tworzy wielokąty izochron dla różnych przedziałów czasowych Args: origin_stop_id: ID przystanku początkowego time_intervals: lista czasów w minutach dla których generować izochrony Returns: list of dict: [{'time': 30, 'geometry': {...}, 'stops': [...]}, ...] """ max_time = max(time_intervals) travel_times = self.calculate_travel_times(origin_stop_id, max_time) if not travel_times: return [] # Grupuj przystanki według przedziałów czasowych isochrones = [] for time_limit in sorted(time_intervals): stops_in_range = { stop_id: time for stop_id, time in travel_times.items() if time <= time_limit } if not stops_in_range: continue # Pobierz współrzędne przystanków points = [] stop_ids = [] for stop_id in stops_in_range.keys(): node_data = self.graph.nodes[stop_id] points.append([node_data['lon'], node_data['lat']]) stop_ids.append(stop_id) if len(points) < 3: # Za mało punktów dla wielokąta - użyj bufora geometry = self._create_buffer_polygon(points, buffer_km=10) else: # Stwórz wypukły wielokąt (convex hull) lub użyj Voronoi geometry = self._create_concave_hull(points) isochrones.append({ 'time': time_limit, 'geometry': geometry, 'stops': stop_ids, 'stop_count': len(stop_ids) }) return isochrones def _create_buffer_polygon(self, points, buffer_km=10): """Tworzy wielokąt z buforem wokół punktów""" if not points: return None # Konwersja km na stopnie (przybliżenie) buffer_degrees = buffer_km / 111.0 multi_point = MultiPoint([Point(p) for p in points]) buffered = multi_point.buffer(buffer_degrees) return self._geometry_to_geojson(buffered) def _create_concave_hull(self, points, alpha=0.3): """ Tworzy concave hull (wielokąt wklęsły) dla punktów Args: points: lista [lon, lat] alpha: parametr kształtu (0-1, mniejszy = bardziej wklęsły) """ if len(points) < 3: return None # Prosty convex hull jako punkt startowy multi_point = MultiPoint([Point(p) for p in points]) hull = multi_point.convex_hull # Dodaj lekki bufor dla lepszej wizualizacji buffered = hull.buffer(0.05) # ~5.5 km return self._geometry_to_geojson(buffered) def _geometry_to_geojson(self, geometry): """Konwertuje Shapely geometry do GeoJSON""" if geometry is None: return None return geometry.__geo_interface__ def get_reachable_stops(self, origin_stop_id, max_time=120): """ Zwraca listę wszystkich osiągalnych przystanków z czasami podróży Args: origin_stop_id: ID przystanku początkowego max_time: maksymalny czas podróży w minutach Returns: list of dict: [{'stop_id': ..., 'name': ..., 'lat': ..., 'lon': ..., 'time': ...}] """ travel_times = self.calculate_travel_times(origin_stop_id, max_time) reachable = [] for stop_id, time in travel_times.items(): node_data = self.graph.nodes[stop_id] reachable.append({ 'stop_id': stop_id, 'name': node_data.get('name', 'Unknown'), 'lat': node_data['lat'], 'lon': node_data['lon'], 'time': round(time, 1) }) return sorted(reachable, key=lambda x: x['time'])