koleo-izochrona/backend/isochrone_calculator.py

162 lines
5.1 KiB
Python

"""
Moduł do obliczania izochron dla połączeń kolejowych
"""
import networkx as nx
from shapely.geometry import Point, MultiPoint
class IsochroneCalculator:
"""Klasa do obliczania i generowania izochron"""
def __init__(self, graph):
"""
Args:
graph: NetworkX DiGraph z wagami jako czasy podróży
"""
self.graph = graph
def calculate_travel_times(self, origin_stop_id, max_time=120):
"""
Oblicza czasy podróży z punktu startowego do wszystkich osiągalnych przystanków
Args:
origin_stop_id: ID przystanku początkowego
max_time: maksymalny czas podróży w minutach
Returns:
dict: {stop_id: travel_time_in_minutes}
"""
if origin_stop_id not in self.graph:
raise ValueError(f'Przystanek {origin_stop_id} nie istnieje w grafie')
# Dijkstra - najkrótsze ścieżki
try:
lengths = nx.single_source_dijkstra_path_length(
self.graph,
origin_stop_id,
cutoff=max_time,
weight='weight'
)
return lengths
except nx.NodeNotFound:
return {}
def create_isochrones(self, origin_stop_id, time_intervals=[30, 60, 90, 120]):
"""
Tworzy wielokąty izochron dla różnych przedziałów czasowych
Args:
origin_stop_id: ID przystanku początkowego
time_intervals: lista czasów w minutach dla których generować izochrony
Returns:
list of dict: [{'time': 30, 'geometry': {...}, 'stops': [...]}, ...]
"""
max_time = max(time_intervals)
travel_times = self.calculate_travel_times(origin_stop_id, max_time)
if not travel_times:
return []
# Grupuj przystanki według przedziałów czasowych
isochrones = []
for time_limit in sorted(time_intervals):
stops_in_range = {
stop_id: time
for stop_id, time in travel_times.items()
if time <= time_limit
}
if not stops_in_range:
continue
# Pobierz współrzędne przystanków
points = []
stop_ids = []
for stop_id in stops_in_range.keys():
node_data = self.graph.nodes[stop_id]
points.append([node_data['lon'], node_data['lat']])
stop_ids.append(stop_id)
if len(points) < 3:
# Za mało punktów dla wielokąta - użyj bufora
geometry = self._create_buffer_polygon(points, buffer_km=10)
else:
# Stwórz wypukły wielokąt (convex hull) lub użyj Voronoi
geometry = self._create_concave_hull(points)
isochrones.append({
'time': time_limit,
'geometry': geometry,
'stops': stop_ids,
'stop_count': len(stop_ids)
})
return isochrones
def _create_buffer_polygon(self, points, buffer_km=10):
"""Tworzy wielokąt z buforem wokół punktów"""
if not points:
return None
# Konwersja km na stopnie (przybliżenie)
buffer_degrees = buffer_km / 111.0
multi_point = MultiPoint([Point(p) for p in points])
buffered = multi_point.buffer(buffer_degrees)
return self._geometry_to_geojson(buffered)
def _create_concave_hull(self, points, alpha=0.3):
"""
Tworzy concave hull (wielokąt wklęsły) dla punktów
Args:
points: lista [lon, lat]
alpha: parametr kształtu (0-1, mniejszy = bardziej wklęsły)
"""
if len(points) < 3:
return None
# Prosty convex hull jako punkt startowy
multi_point = MultiPoint([Point(p) for p in points])
hull = multi_point.convex_hull
# Dodaj lekki bufor dla lepszej wizualizacji
buffered = hull.buffer(0.05) # ~5.5 km
return self._geometry_to_geojson(buffered)
def _geometry_to_geojson(self, geometry):
"""Konwertuje Shapely geometry do GeoJSON"""
if geometry is None:
return None
return geometry.__geo_interface__
def get_reachable_stops(self, origin_stop_id, max_time=120):
"""
Zwraca listę wszystkich osiągalnych przystanków z czasami podróży
Args:
origin_stop_id: ID przystanku początkowego
max_time: maksymalny czas podróży w minutach
Returns:
list of dict: [{'stop_id': ..., 'name': ..., 'lat': ..., 'lon': ..., 'time': ...}]
"""
travel_times = self.calculate_travel_times(origin_stop_id, max_time)
reachable = []
for stop_id, time in travel_times.items():
node_data = self.graph.nodes[stop_id]
reachable.append({
'stop_id': stop_id,
'name': node_data.get('name', 'Unknown'),
'lat': node_data['lat'],
'lon': node_data['lon'],
'time': round(time, 1)
})
return sorted(reachable, key=lambda x: x['time'])