Practical 9 Code Solution

GA Result

Part 2: Parallel GVI

Part 1: Parallel Distortion

Part 2: Parallel GVI

"""
Understanding GIS: Practical 9
@author jonnyhuck

Calculate multiple Viewsheds using Bresenham's Line and Midpoint Circle Algorithm in parallel

References:
    https://rasterio.readthedocs.io/en/latest/api/rasterio.plot.html
    https://rasterio.readthedocs.io/en/stable/topics/plotting.html
    https://matplotlib.org/3.1.1/gallery/color/colormap_reference.html
    https://matplotlib.org/stable/gallery/text_labels_and_annotations/custom_legends.html
    http://webhelp.esri.com/arcgisdesktop/9.3/index.cfm?TopicName=How%20Visibility%20works
    https://www.zoran-cuckovic.from.hr/QGIS-visibility-analysis/help_qgis2.html#algorithm-options
    https://matplotlib.org/api/_as_gen/matplotlib.axes.Axes.contour.html#matplotlib.axes.Axes.contour
    https://docs.python.org/3/library/multiprocessing.shared_memory.html
    https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor
    
New Topics:
    Data Types
    SharedMemory
    ProcessPoolExecutor
    as_completed
"""

from gc import collect
from math import hypot
from time import perf_counter
from geopandas import read_file
from rasterio import open as rio_open
from rasterio.transform import rowcol
from multiprocessing import get_context
from matplotlib.colors import Normalize
from matplotlib.cm import ScalarMappable
from rasterio.plot import show as rio_show
from matplotlib.pyplot import subplots, savefig
from skimage.draw import line, circle_perimeter
from matplotlib_scalebar.scalebar import ScaleBar
from multiprocessing.shared_memory import SharedMemory
from concurrent.futures import ProcessPoolExecutor, as_completed
from numpy import column_stack, array, dtype, ndarray, nan, intp, sum as np_sum


def coord_2_img(transform, x, y):
    """ 
    * Convert from coordinate space to image space
    """
    r, c = rowcol(transform, x, y)
    return int(r), int(c)


def attach_shared_memory(shm_name, shape, dtype_str):
    """
    * Attach to raster dataset in shared memory 
    * Returns: (memory reference, band dataset)
    """
    # get reference to memory location of the shared dataset
    existing_shm = SharedMemory(name=shm_name)
    
    # construct a numpy array object using that location in memory
    shared_data = ndarray(shape, dtype=dtype(dtype_str), buffer=existing_shm.buf)
    
    # return a tuple in the form (memory reference, numpy array)
    return (existing_shm, shared_data)


def line_of_sight_coords(r0, c0, z0, r1, c1, radius, dsm_data):
    """
    * Use Bresenham's Line algorithm to calculate a line of sight from one point to another point, 
    *	returning a list of visible cells
    """
    
    # init variable for biggest dydx so far (starts at -infinity)
    max_dydx = -float('inf')
    
    # iterate along integer raster cells on line from observer to target
    visible_coords = []
    for r, c in column_stack(line(r0, c0, r1, c1))[1:]: # skip the observer cell itself
        
        # distance in pixels from observer (Euclidean)
        dx = hypot(c0 - c, r0 - r)

        # if we go too far, or go off the edge of the data, stop looping
        if dx > radius or not (0 <= r < dsm_data.shape[0]) or not (0 <= c < dsm_data.shape[1]):
            break

        # slope from observer to the *top* of this cell
        cur_dydx =  (dsm_data[r, c] - z0) / dx

        # if this slope is greater than any previous slope, this cell is visible
        if cur_dydx > max_dydx:
            visible_coords.append((int(r), int(c)))

            # update max_slope to block further lower-angle points
            max_dydx = cur_dydx

    return visible_coords


def gvi_worker(x0, y0, radius_m, observer_height, dsm, trees, transform):
    """
    * Calculate Green Visibility Index based on Labib et al. (2021)
    *   Note that there is no target height in this case, as we are using 
    *   the DSM (so it is 0).
    """
    # attach to shared data objects using argument expansion
    existing_dsm_shm, dsm_data = attach_shared_memory(*dsm)
    existing_trees_shm, trees_data = attach_shared_memory(*trees)

    # use try-finally statement to ensure that we close the shared memory references
    try:
        # convert observer location to raster row/col
        r0, c0 = coord_2_img(transform, x0, y0)
        
        # catch if the dataset is outside the dataset
        if not (0 <= r0 < dsm_data.shape[0]) or not (0 <= c0 < dsm_data.shape[1]):
            print(f"ERROR: data point {x0},{y0} outside of dataset")
            return None

        # radius in pixels (use absolute of pixel x-scale; assumes square pixels)
        radius_px = int(radius_m / abs(transform[0]))

        # observer absolute elevation: DSM value at observer plus observer height
        observer_abs_elev = dsm_data[r0, c0] + observer_height

        # collect visible coords in a set (to avliod duplicates)
        visible = set()

        # add the observer location
        visible.add((int(r0), int(c0)))

        # iterate perimeter points to cast rays
        for rr, cc in column_stack(circle_perimeter(r0, c0, radius_px)):

            # for each line, get visible coords and add to set
            for rc in line_of_sight_coords(r0, c0, observer_abs_elev, int(rr), int(cc), radius_px, dsm_data):
                visible.add(rc)

        # convert to numpy arrays for fast indexing and ensure indices are valid
        rows, cols = zip(*visible)
        rows = array(rows, dtype=intp)  # intp is a convenient alias for the default integer type used by the system
        cols = array(cols, dtype=intp)

        # count visible tree pixels
        visible_trees = np_sum(trees_data[rows, cols] == 1)

        # return GVI value (proportion of visible cells that are tree pixels)
        return visible_trees / float(len(rows))

    finally:
        # Always close shared memory handles in worker before exiting
        existing_dsm_shm.close()
        existing_trees_shm.close()


# main block
if __name__ == "__main__":

    # load raster datasets
    with rio_open("./data/bolton/1m_data/bolton_dsm.tif") as dsm:
        with rio_open("./data/bolton/1m_data/bolton_trees_resample.tif") as trees:

            # read dem and dsm data
            dsm_data = dsm.read(1)
            trees_data = trees.read(1)

            # read schools data
            schools = read_file("./data/bolton/primary_schools.shp")
            # print(dsm.crs.to_epsg(), trees.crs.to_epsg(), schools.crs.to_epsg())

            # use try-finally block to ensure that we always close the Shared Memory
            try:

                # put DSM in shared memory
                dsm_nbytes = dsm_data.size * dsm_data.dtype.itemsize
                dsm_shm = SharedMemory(create=True, size=dsm_nbytes)
                dsm_shared_arr = ndarray(dsm_data.shape, dtype=dsm_data.dtype, buffer=dsm_shm.buf)
                dsm_shared_arr[:] = dsm_data[:]

                # put trees dataset in shared memory
                trees_nbytes = trees_data.size * trees_data.dtype.itemsize
                trees_shm = SharedMemory(create=True, size=trees_nbytes)
                trees_shared_arr = ndarray(trees_data.shape, dtype=trees_data.dtype, buffer=trees_shm.buf)
                trees_shared_arr[:] = trees_data[:]

                # start timer
                start = perf_counter()

                # build task list (one per school)
                tasks = []
                for idx, school in schools.iterrows():
                    
                    # this is a tuple (id, dictionary), where the dictionary reflects the argument for gvi_worker()
                    tasks.append((idx, {
                        'x0': school.geometry.x,
                        'y0': school.geometry.y,
                        'radius_m': 800,
                        'observer_height': 1.7,
                        'dsm': (dsm_shm.name, dsm_data.shape, str(dsm_data.dtype)),
                        'trees': (trees_shm.name, trees_data.shape, str(trees_data.dtype)),
                        'transform': dsm.transform
                    }))

                # create an Executor to run GVI calculations in parallel - force spawn context
                # with ProcessPoolExecutor(mp_context=get_context("spawn"), max_workers=1) as executor:    # to simulate serial + shared memory...
                with ProcessPoolExecutor(mp_context=get_context("spawn")) as executor:
                    
                    # submit tasks and keep mapping future -> task idx for assignment
                    future_map = {}
                    for task in tasks:

                        # extract the school ID and parameter values from the task
                        idx, task_args = task
                        
                        # submit the task to the executor
                        future = executor.submit(gvi_worker, **task_args)
                        
                        # record the ID for this school, so that we can assign the correct value later
                        future_map[future] = idx

                    # report how many processes were launched
                    print(f"Launched {len(executor._processes)} processes...\n")

                    # process the processes as they complete using as_completed()
                    for future in as_completed(future_map.keys()):
                        
                        # get the ID for the school in this process
                        idx = future_map[future]
                        
                        # use try-except so if one school fails they don't all fail
                        try:

                            # get the result for this future object and load into the schools GeoDataFrame
                            schools.loc[idx, "gvi"] = future.result()

                        except Exception as e:
                            print(f"ERROR: Worker raised an exception: {e}")
                            schools.loc[idx, "gvi"] = nan

                # report performance counter results 
                print(f"{len(schools.index)} Schools analysed in {perf_counter() - start:.2f} seconds.")

                # Plot DEM + GVI points
                fig, my_ax = subplots(1, 1, figsize=(16, 10))
                my_ax.set_title("Green Visibility Index (GVI) per school")

                # show trees raster as context (transparent)
                rio_show(
                    trees,
                    ax=my_ax,
                    transform = trees.transform,
                    cmap = 'Greens',
                    alpha=0.4,
                )

                # Plot schools coloured by computed GVI; use only the selected subset for clarity
                schools.plot(
                    ax = my_ax,
                    column='gvi',
                    cmap = 'Greens',
                    markersize = 120,
                    edgecolor = 'black',
                )

                # add a colour bar for GVI
                if schools["gvi"].notna().any():
                    fig.colorbar(
                        ScalarMappable(norm=Normalize(vmin=schools["gvi"].min(), vmax=schools["gvi"].max()), cmap='Greens'),
                        ax=my_ax,
                        pad=0.01,
                        label="GVI"
                    )

                # add north arrow
                x, y, arrow_length = 0.97, 0.99, 0.1
                my_ax.annotate('N', xy=(x, y), xytext=(x, y-arrow_length),
                    arrowprops=dict(facecolor='black', width=5, headwidth=15),
                    ha='center', va='center', fontsize=20, xycoords=my_ax.transAxes)

                # add scalebar
                my_ax.add_artist(ScaleBar(dx=1, units="m", location="lower right"))

                # output
                savefig('./out/9.png', bbox_inches='tight')

            # make sure the below runs, even if there is an error
            finally:

                # destroy numpy references to the shared memory
                del dsm_shared_arr
                del trees_shared_arr
            
                # now force garbage collection to destroy array base pointers
                collect()
            
                # finally, it is safe to close and unlink the shared memory
                dsm_shm.close()
                dsm_shm.unlink()
                trees_shm.close()
                trees_shm.unlink()
                
    print("done!")

Part 1: Parallel Distortion

from pandas import DataFrame
from time import perf_counter
from geopandas import read_file
from week5 import evaluate_distortion
from pyproj import Geod, CRS, Transformer
from concurrent.futures import ProcessPoolExecutor, as_completed


def distortion_worker(geo_string, proj_string, g, minx, miny, maxx, maxy, minr=10000, maxr=1000000, samples=10000):
    """ construct a transformer and evaluate distortion """

    # construct transformer
    transformer = Transformer.from_crs(CRS.from_proj4(geo_string), CRS.from_proj4(proj_string), always_xy=True)

    # calculate the distortion and return
    return evaluate_distortion(g, transformer, minx, miny, maxx, maxy, minr, maxr, samples)


# main code block
if __name__ == "__main__":

    # set the projected CRSs to evaluate for distortion (All Equal Areas from projectionwizard.org)
    projections = [
        {'name':'Mollweide', 'proj':'+proj=moll +lon_0=0 +datum=WGS84 +units=m +no_defs'},
        {'name':'Hammer', 'proj':"+proj=hammer +lon_0=0 +datum=WGS84 +units=m +no_defs"},
        {'name':'Equal Earth', 'proj':"+proj=eqearth +lon_0=0 +datum=WGS84 +units=m +no_defs"},
        {'name':'Eckert IV', 'proj':"+proj=eck4 +lon_0=0 +x_0=0 +y_0=0 +datum=WGS84 +units=m +no_defs"},
        {'name':'Wagner IV', 'proj':"+proj=wag4 +lon_0=0 +datum=WGS84 +units=m +no_defs"},
        {'name':'Wagner VII', 'proj':"+proj=wag7 +lon_0=0 +datum=WGS84 +units=m +no_defs"},
        ]

    # set the geographical proj string and ellipsoid (should be the same)
    geo_string = "+proj=longlat +datum=WGS84 +no_defs"
    g = Geod(ellps='WGS84')

    # load the shapefile of countries and extract country of interest
    world = read_file("./data/natural-earth/ne_10m_admin_0_countries.shp")

    # set country using country code
    country = world.loc[world.ISO_A3 == "ISL"]

    # get the bounds of the country
    minx, miny, maxx, maxy = country.total_bounds

    # start timer
    start = perf_counter()

    # initialise a dataframe to hold the results
    results = DataFrame(projections)

    # build task list (one per school)
    tasks = []
    for idx, p in results.iterrows():
        
        # this is a tuple (id, dictionary), where the dictionary contains the arguments for distortion_worker
        tasks.append((idx, {
            'geo_string': geo_string, 
            'proj_string': p['proj'], 
            'g': g, 
            'minx': minx, 
            'miny': miny, 
            'maxx': maxx, 
            'maxy': maxy
        }))

    # create an Executor to run GVI calculations in parallel
    with ProcessPoolExecutor() as executor:

        # submit tasks and keep mapping future -> task idx for assignment
        future_map = {}
        for task in tasks:

            # use variable expansion to get the id of the projection and the function arguments
            idx, args = task
            
            # submit the task to the executor - this calls the function with the arguments in task_args
            future = executor.submit(distortion_worker, **args)
            
            # record the ID so that we can assign the correct value later
            future_map[future] = idx

        # report how many processes were launched
        print(f"Launched {len(executor._processes)} processes...\n")

        # process the threads as they complete using as_completed()
        for future in as_completed(future_map.keys()):
            
            # get the ID for the projection in this process
            idx = future_map[future]
            
            # get the result for this future object and load into the schools GeoDataFrame
            results.loc[idx, ['Ep', 'Es', 'Ea']] = future.result()

    # print the results, sorted by area distortion
    print(results.sort_values('Ea'))
    print(f"\nCompleted in {perf_counter() - start:.2f} seconds.")