162 lines
5.1 KiB
Python
162 lines
5.1 KiB
Python
"""
|
|
Moduł do obliczania izochron dla połączeń kolejowych
|
|
"""
|
|
import networkx as nx
|
|
from shapely.geometry import Point, MultiPoint
|
|
|
|
|
|
class IsochroneCalculator:
|
|
"""Klasa do obliczania i generowania izochron"""
|
|
|
|
def __init__(self, graph):
|
|
"""
|
|
Args:
|
|
graph: NetworkX DiGraph z wagami jako czasy podróży
|
|
"""
|
|
self.graph = graph
|
|
|
|
def calculate_travel_times(self, origin_stop_id, max_time=120):
|
|
"""
|
|
Oblicza czasy podróży z punktu startowego do wszystkich osiągalnych przystanków
|
|
|
|
Args:
|
|
origin_stop_id: ID przystanku początkowego
|
|
max_time: maksymalny czas podróży w minutach
|
|
|
|
Returns:
|
|
dict: {stop_id: travel_time_in_minutes}
|
|
"""
|
|
if origin_stop_id not in self.graph:
|
|
raise ValueError(f'Przystanek {origin_stop_id} nie istnieje w grafie')
|
|
|
|
# Dijkstra - najkrótsze ścieżki
|
|
try:
|
|
lengths = nx.single_source_dijkstra_path_length(
|
|
self.graph,
|
|
origin_stop_id,
|
|
cutoff=max_time,
|
|
weight='weight'
|
|
)
|
|
return lengths
|
|
except nx.NodeNotFound:
|
|
return {}
|
|
|
|
def create_isochrones(self, origin_stop_id, time_intervals=[30, 60, 90, 120]):
|
|
"""
|
|
Tworzy wielokąty izochron dla różnych przedziałów czasowych
|
|
|
|
Args:
|
|
origin_stop_id: ID przystanku początkowego
|
|
time_intervals: lista czasów w minutach dla których generować izochrony
|
|
|
|
Returns:
|
|
list of dict: [{'time': 30, 'geometry': {...}, 'stops': [...]}, ...]
|
|
"""
|
|
max_time = max(time_intervals)
|
|
travel_times = self.calculate_travel_times(origin_stop_id, max_time)
|
|
|
|
if not travel_times:
|
|
return []
|
|
|
|
# Grupuj przystanki według przedziałów czasowych
|
|
isochrones = []
|
|
for time_limit in sorted(time_intervals):
|
|
stops_in_range = {
|
|
stop_id: time
|
|
for stop_id, time in travel_times.items()
|
|
if time <= time_limit
|
|
}
|
|
|
|
if not stops_in_range:
|
|
continue
|
|
|
|
# Pobierz współrzędne przystanków
|
|
points = []
|
|
stop_ids = []
|
|
for stop_id in stops_in_range.keys():
|
|
node_data = self.graph.nodes[stop_id]
|
|
points.append([node_data['lon'], node_data['lat']])
|
|
stop_ids.append(stop_id)
|
|
|
|
if len(points) < 3:
|
|
# Za mało punktów dla wielokąta - użyj bufora
|
|
geometry = self._create_buffer_polygon(points, buffer_km=10)
|
|
else:
|
|
# Stwórz wypukły wielokąt (convex hull) lub użyj Voronoi
|
|
geometry = self._create_concave_hull(points)
|
|
|
|
isochrones.append({
|
|
'time': time_limit,
|
|
'geometry': geometry,
|
|
'stops': stop_ids,
|
|
'stop_count': len(stop_ids)
|
|
})
|
|
|
|
return isochrones
|
|
|
|
def _create_buffer_polygon(self, points, buffer_km=10):
|
|
"""Tworzy wielokąt z buforem wokół punktów"""
|
|
if not points:
|
|
return None
|
|
|
|
# Konwersja km na stopnie (przybliżenie)
|
|
buffer_degrees = buffer_km / 111.0
|
|
|
|
multi_point = MultiPoint([Point(p) for p in points])
|
|
buffered = multi_point.buffer(buffer_degrees)
|
|
|
|
return self._geometry_to_geojson(buffered)
|
|
|
|
def _create_concave_hull(self, points, alpha=0.3):
|
|
"""
|
|
Tworzy concave hull (wielokąt wklęsły) dla punktów
|
|
|
|
Args:
|
|
points: lista [lon, lat]
|
|
alpha: parametr kształtu (0-1, mniejszy = bardziej wklęsły)
|
|
"""
|
|
if len(points) < 3:
|
|
return None
|
|
|
|
# Prosty convex hull jako punkt startowy
|
|
multi_point = MultiPoint([Point(p) for p in points])
|
|
hull = multi_point.convex_hull
|
|
|
|
# Dodaj lekki bufor dla lepszej wizualizacji
|
|
buffered = hull.buffer(0.05) # ~5.5 km
|
|
|
|
return self._geometry_to_geojson(buffered)
|
|
|
|
def _geometry_to_geojson(self, geometry):
|
|
"""Konwertuje Shapely geometry do GeoJSON"""
|
|
if geometry is None:
|
|
return None
|
|
|
|
return geometry.__geo_interface__
|
|
|
|
def get_reachable_stops(self, origin_stop_id, max_time=120):
|
|
"""
|
|
Zwraca listę wszystkich osiągalnych przystanków z czasami podróży
|
|
|
|
Args:
|
|
origin_stop_id: ID przystanku początkowego
|
|
max_time: maksymalny czas podróży w minutach
|
|
|
|
Returns:
|
|
list of dict: [{'stop_id': ..., 'name': ..., 'lat': ..., 'lon': ..., 'time': ...}]
|
|
"""
|
|
travel_times = self.calculate_travel_times(origin_stop_id, max_time)
|
|
|
|
reachable = []
|
|
for stop_id, time in travel_times.items():
|
|
node_data = self.graph.nodes[stop_id]
|
|
reachable.append({
|
|
'stop_id': stop_id,
|
|
'name': node_data.get('name', 'Unknown'),
|
|
'lat': node_data['lat'],
|
|
'lon': node_data['lon'],
|
|
'time': round(time, 1)
|
|
})
|
|
|
|
return sorted(reachable, key=lambda x: x['time'])
|