"""
Understanding GIS: Practical 8
@author jonnyhuck
Find the shortest path between two points in Kaliningrad
Note that I have used `distance` (and variants) in the variable names for clarity - this should really be cost,
and would not necessarily have to include distance as a component (though it normally would be).
References:
https://geoffboeing.com/2016/11/osmnx-python-street-networks/
https://networkx.github.io/documentation/stable/reference/algorithms/euler.html
https://networkx.github.io/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.astar.astar_path.html#networkx.algorithms.shortest_paths.astar.astar_path
https://github.com/gboeing/osmnx/issues/247
https://toblerity.org/rtree/
https://networkx.org/documentation/stable/_modules/networkx/algorithms/shortest_paths/astar.html
https://github.com/networkx/networkx/blob/b469bdd153e4ed89c1ec166aa9bec05e5653644f/networkx/algorithms/shortest_paths/weighted.py
New Topics:
Exceptions
heapq
"""
from sys import exit
from pyproj import Geod
from rtree import index
from osmnx import graph_from_xml
from heapq import heappush, heappop
from matplotlib.patches import Patch
from geopandas import read_file, GeoSeries
from shapely.geometry import LineString, Point
from pickle import dump, load, HIGHEST_PROTOCOL
from matplotlib_scalebar.scalebar import ScaleBar
from matplotlib.pyplot import subplots, savefig, Line2D
from networkx import NodeNotFound, NetworkXNoPath, astar_path as astar_path_nx, DiGraph
def ellipsoidal_distance(node_a, node_b):
"""
Calculate the 'as the crow flies' distance between two nodes in a graph using
the Inverse Vincenty method, via the PyProj library.
"""
# extract the data (the coordinates) from node_a and node_b
point_a = graph.nodes(data=True)[node_a]
point_b = graph.nodes(data=True)[node_b]
# compute the distance across the WGS84 ellipsoid (the one used by the dataset)
return Geod(ellps='WGS84').inv(point_a['x'], point_a['y'], point_b['x'], point_b['y'])[2]
def reconstruct_path(end_node, parent_node, parents):
"""
Once we have found the end node of our route, reconstruct the shortest path
to it using the parents list
"""
# initialise a list that will contain the path, beginning with the current node (the end of the route)
path = [end_node]
# then get the parent (the node from which we arrived at the end of the route)
node = parent_node
# loop back through the list of explored nodes until we reach the start node (the one where parent == None)
while node is not None:
# for each node in the path, add it to the path...
path.append(node)
# ...and move on to its parent (the one before it in the path)
node = parents[node]
# finally, reverse the path (so it goes start -> end) and return it
path.reverse() # note that this is an 'in place' function that edits the list itself, it does not return anything!
return path
def path_to_linestring(start_point, path_list, end_point):
"""
Convert a shortest path to a LineString object
"""
# initialise the list with the start point
line = [start_point]
# loop through each node in the shortest path and load into list
for n in path_list:
# get the relevant node from the graph with lat lng data
node = graph.nodes(data=True)[n]
# load the lat lng data into the lineString
line.append([node['x'], node['y']])
# append end point to list
line.append(end_point)
# store as a LineString
return LineString(line)
def astar_path(G, source, target, heuristic):
"""
Calculate the shortest path from `source` to `target` using thr AStar algorithm
"""
# first, make sure that both the `source` and `target` nodes actually exist...
if source not in G or target not in G:
raise NodeNotFound(f"Either source {source} or target {target} is not in G")
# create a counter to prevent the heap from attempting to compare nodes to themselves
counter = 0
# initialise a list for the heap queue. This will be a list of tuples, each containing 4 values:
# `priority`: this is the value on which the list will be sorted by the heap queue algorithm. In
# our case, this will be the estimated distance for this node (the network distance from the
# start to this node + the straight line distance from this node to the end)
# `counter`: this is simply a unique number to make sure that the algorithm can sort nodes with
# equal `priority` values
# `node`: the ID of the node to which this entry refers
# `cost`: this is the network distance between the start point and the node, used as part of our
# routing algorithm (storing it prevents us from needing to calculate the same distance multiple
# times)
# `parent`: the ID of the node immediately before this one in the path
queue = [(0, counter, source, 0, None)]
# dictionary to keep track of the network distance from the start to this node, and the straight
# line distance from this node to the end point. This is used to decide which is the best parent
# for each node that we record in the `parents`` dictionary
distances = {}
# dictionary to keep track of the parent of each node that we have explored. This is used when we
# come to reconstruct the route once we reach the end point
parents = {}
# keep going as long as there are more nodes to search
while queue:
# pop the next node, its network distance from the start, and its parent from the queue
cur_node, cur_net_dist, cur_parent = heappop(queue)[2:] # use list slicing to ignore the first two items
''' Section 1: a series of checks for whether we should reject the node '''
# check if we have reached the destination
if cur_node == target:
# if so, reconstruct the path and return
return reconstruct_path(cur_node, cur_parent, parents)
''' Section 2: if we are visiting a node we have already seen '''
# check if we have already explored this node
if cur_node in parents:
# if we are back at the start, abandon this path and try the next one
if parents[cur_node] is None:
continue
# if we already have a shorter path to this node, abandon this new path and try the next one
if distances[cur_node][0] < cur_net_dist:
continue
''' Section 3: if we have made it this far, assess the node'''
# add the parent of the current node to the list of parents
parents[cur_node] = cur_parent
# get the neighbours for the current node (under assessment)
for neighbour, edge_data in G[cur_node].items():
# print(edge_data)
# work out the network distance to this neighbour from the start of the path - this
# is simply the distance to the current node (which we already know), plus the
# distance from that node to this neighbour (which is stored in the edge data)
dist_from_start = cur_net_dist + edge_data[0]['length']
# print(f"{dist_from_start:.2f}")
# exit()
# have we already seen this neighbour?
if neighbour in distances:
# as we have been get the network distance from the start via the previous and straight
# line distance to the end
previous_dist_from_start, dist_to_end = distances[neighbour]
# if the previous path we found to this node is shorter, abandon this new path
if previous_dist_from_start <= dist_from_start:
continue
# if we haven't seen this neighbour before, calculate the straight line distance to the end
else:
dist_to_end = heuristic(neighbour, target)
# add the two distances to the distances dictionary
distances[neighbour] = (dist_from_start, dist_to_end)
# work out the estimated distance for this path (the network distance from the start to
# this node + the straight line distance from this node to the end). This will act as the
# priority value for our heap queue - the route with the shortest estimated didstance will
# be assessed first.
estimated_dist = dist_from_start + dist_to_end
# print(f"{estimated_dist:.2f}")
# exit()
# increment counter and push to heap
counter += 1
heappush(queue, (estimated_dist, counter, neighbour, dist_from_start, cur_node))
# if the loop finishes, we didn't find a route - raise an exception
raise NetworkXNoPath(f"Node {target} not reachable from {source}")
# declare projection strings
wgs84 = "+proj=longlat +datum=WGS84 +no_defs"
utm34 = "+proj=utm +zone=34 +ellps=WGS84 +datum=WGS84 +units=m +no_defs"
# load data from serialised Pickle file
try:
with open('./data/kaliningrad.pkl', 'rb') as input:
print("Pickle file found successfully, loading data.")
# extract the serialised dictionary of the three datasets
data = load(input)
# extract the three individual datasets from the dictionary
graph = data['graph']
buildings = data['buildings']
water = data['water']
# load data from XML and Shapefiles, and create pickle for future use
except FileNotFoundError:
print("No pickle file found, loading data (takes a long time).")
# load the data from their respective files
graph = graph_from_xml('./data/kaliningrad/kaliningrad.xml')
buildings = read_file('./data/kaliningrad/buildings.shp').to_crs(utm34)
water = read_file('./data/kaliningrad/water.shp').to_crs(utm34)
# add all three to a dictionary
data = {
'graph': graph,
'buildings': buildings,
'water': water
}
# serialise the dictionary for future use
with open('./data/kaliningrad.pkl', 'wb') as output:
dump(data, output, HIGHEST_PROTOCOL)
# create spatial index from graph
idx = index.Index()
for id, data in list(graph.nodes(data=True)):
idx.insert(int(id), (data['x'], data['y'], data['x'], data['y']))
# print(idx.count(idx.bounds), len(graph.nodes))
# calculate the 'from' and 'to' node as the nearest to the specified coordinates
from_point = (20.483322,54.692934)
to_point = (20.544863,54.723827)
from_node = list(idx.nearest(from_point, 1))[0]
to_node = list(idx.nearest(to_point, 1))[0]
# print(graph.nodes()[from_node], graph.nodes()[to_node])
''' CALCULATE SHORTEST PATH(S) '''
# use try statement to catch exceptions
try:
# calculate the shortest path using my implementation
shortest_path = astar_path(graph, from_node, to_node, ellipsoidal_distance)
# print(shortest_path)
# calculate the shortest path using the networkx implementation
shortest_path_nx = astar_path_nx(DiGraph(graph), from_node, to_node, ellipsoidal_distance)
# catch exception for no path available
except NodeNotFound:
print("Sorry, there is no path between those locations in the provided network")
exit()
# catch exception for no path available
except NetworkXNoPath:
print("Sorry, there is no path between those locations in the provided network")
exit()
''' DRAW YOUR MAP '''
# print(path_to_linestring(from_point, shortest_path, to_point))
# convert linestring to GeoSeries and project to UTM zone 34
path = GeoSeries(path_to_linestring(from_point, shortest_path, to_point), crs=wgs84).to_crs(utm34)
path_nx = GeoSeries(path_to_linestring(from_point, shortest_path_nx, to_point), crs=wgs84).to_crs(utm34)
# print(path)
# report the comparative route lengths (using local projection)
print(f"My route: {path.geometry.iloc[0].length:.0f}m. Network X route: {path_nx.geometry.iloc[0].length:.0f}m.")
# create map axis object, remove axes, set title
fig, my_ax = subplots(1, 1, figsize=(16, 10))
my_ax.axis('off')
my_ax.set(title="The 4 Bridges of Kaliningrad")
# set bounds
buffer = 1000
my_ax.set_xlim([path.geometry.iloc[0].bounds[0] - buffer, path.geometry.iloc[0].bounds[2] + buffer])
my_ax.set_ylim([path.geometry.iloc[0].bounds[1] - buffer, path.geometry.iloc[0].bounds[3] + buffer])
# add the water
water.plot(
ax=my_ax,
color='#a6cee3',
linewidth = 1,
zorder=1
)
# add the buildings
buildings.plot(
ax=my_ax,
color='grey',
linewidth = 1,
zorder=2
)
# add the networkx path
path_nx.plot(
ax=my_ax,
color='#4daf4a',
linewidth = 5,
zorder=3
)
# add the path
path.plot(
ax=my_ax,
color='#984ea3',
linewidth = 3,
zorder=4
)
# add the start point
GeoSeries(Point(from_point), crs=wgs84).to_crs(utm34).plot(
ax = my_ax,
markersize = 60,
color = 'blue',
edgecolor = 'black',
zorder=5
)
# add the end point
GeoSeries(Point(to_point), crs=wgs84).to_crs(utm34).plot(
ax = my_ax,
markersize = 60,
color = 'red',
edgecolor = 'black',
zorder=6
)
# manually draw a legend
my_ax.legend([
Line2D([0], [0], marker='o', color='w', markerfacecolor='blue', markeredgecolor='black', markersize=8),
Line2D([0], [0], marker='o', color='w', markerfacecolor='red', markeredgecolor='black', markersize=8),
Patch(facecolor='grey'),
Patch(facecolor='#a6cee3', edgecolor='#a6cee3'),
Line2D([0], [0], color='#984ea3', lw=3),
Line2D([0], [0], color='#4daf4a', lw=3)],
['Origin', 'Destination', 'Buildings', 'Water', 'Path', 'Path(NX)'], loc='lower right')
# add north arrow
x, y, arrow_length = 0.99, 0.99, 0.1
my_ax.annotate('N', xy=(x, y), xytext=(x, y-arrow_length),
arrowprops=dict(facecolor='black', width=5, headwidth=15),
ha='center', va='center', fontsize=20, xycoords=my_ax.transAxes)
# add scalebar
my_ax.add_artist(ScaleBar(dx=1, units="m", location="lower left"))
# save the result
savefig(f'./out/8.png', bbox_inches='tight')
print("done!")