Source code for linref.events.merge

"""
===============================================================================

Module featuring classes and functionality for merging events collections and 
summarizing/retrieving information from these merges. For ease of use, features 
in this module should be accessed through collection-level merging methods such 
as EventsCollection.merge instead of abstractly through the classes themselves.


Classes
-------
EventsMerge, EventsMergeAttribute, EventsMergeTrace


Dependencies
------------
pandas, numpy, rangel, copy, warnings, functools


Development
-----------
Developed by:
Tariq Shihadah, tariq.shihadah@gmail.com

Created:
10/1/2021

Modified:
10/1/2021

===============================================================================
"""


################
# DEPENDENCIES #
################

import pandas as pd
import numpy as np
import copy, warnings
from functools import wraps
from rangel import RangeCollection


[docs]class EventsMergeAttribute(object): def __init__(self, parent, column): self.parent = parent self.column = column @property def parent(self): return self._parent @parent.setter def parent(self, obj): if not isinstance(obj, EventsMerge): raise TypeError("Input parent must be EventsMerge type.") self._parent = obj @property def column(self): return self._column @column.setter def column(self, label): # Multiple labels provided if isinstance(label, (list, tuple, np.ndarray)): if not set(label).issubset(self.parent.columns): raise ValueError("All column labels must be present in " "EventsMerge.columns.") # Process input self._ndim = 2 self._ncols = len(label) self._loc = [self.parent.right.columns.index(i) for i in label] self._column = list(label) # Single label provided else: if not label in self.parent.columns: raise ValueError("Invalid column label for merged events. Must " "be present in EventsMerge.columns.") # Process input self._ndim = 1 self._ncols = 1 self._loc = [self.parent.right.columns.index(label)] self._column = label @property def traces(self): try: return self.parent.traces except AttributeError: self.parent.build(inplace=True) return self.parent.traces @property def loc(self): return self._loc @property def ncols(self): return self._ncols @property def ndim(self): return self._ndim def _to_pandas(self, index, data, as_series=False, squeeze=True): # Create pandas object if as_series: obj = pd.Series( data=data, index=index, name=self.column ) else: obj = pd.DataFrame( data=data, index=index, columns=self.column if self._ndim != 1 else [self.column] ) if self._ndim == 1 and squeeze: obj = obj.iloc[:, 0] return obj def _get_empty(self, empty=None): return np.nan if empty is None else empty def _agg(self, func, empty=None, as_array=True, **kwargs): """ Generic attribute aggregator. Aggregation functions will be passed a single EventsMergeTrace instance for each unique EventsGroup in the left EventsCollection, along with an array of the values in the right EventsGroup indexed by the selected columns. """ # Validate fill value empty = self._get_empty(empty) # Iterate over events to create aggregated result index = [] res = [] for trace in self.traces: # Where actual traces are created, perform aggregation function if trace.success: arr = trace.group_right.df.values[:, self.loc] res_i = func(arr, trace, **kwargs) # Otherwise create a full empty array else: res_i = np.full( (trace.group_left.df.shape[0], self._ncols), empty) # Log results from trace index.extend(trace.group_left.df.index) res.append(res_i) # Combine results for each left group into a single array that is # compatible with the left collection if as_array: return np.array(index), np.concatenate(res) else: # Flatten data (relevant for geometry aggregators) return index, [i for sub in res for i in sub]
[docs] def all(self, empty=None, **kwargs): """ Return all values from intersecting events in a list. Parameters ---------- empty : scalar, string, or other pd.Series-compatible value, optional Value to use to fill when there is no matching events group and aggregation cannot be performed. If None, values will be filled with np.nan. """ def _func(arr, trace, **kwargs): # Choose all intersecting events res = np.empty((trace.mask.shape[0], self._ncols), dtype=object) res[:] = [list(arr[mask_i, :].T) for mask_i in trace.mask] return res return self._to_pandas(*self._agg(_func, empty=empty))
[docs] def any(self, empty=None, **kwargs): """ Indicate whether each record intersects with at least one event. Parameters ---------- empty : scalar, string, or other pd.Series-compatible value, optional Value to use to fill when there is no matching events group and aggregation cannot be performed. If None, values will be filled with np.nan. """ def _func(arr, trace, **kwargs): # Find any intersecting events, apply to all columns res = np.tile(trace.mask.any(axis=1), (self._ncols,1)).T return res return self._to_pandas(*self._agg(_func, empty=empty))
[docs] def unique(self, empty=None, **kwargs): """ Return all unique values from intersecting events in a tuple. Parameters ---------- empty : scalar, string, or other pd.Series-compatible value, optional Value to use to fill when there is no matching events group and aggregation cannot be performed. If None, values will be filled with np.nan. """ def _func(arr, trace, **kwargs): # Choose all intersecting events res = np.empty((trace.mask.shape[0], self._ncols), dtype=object) res[:] = [[list(set(sub)) for sub in arr[mask_i, :].T] \ for mask_i in trace.mask] return res return self._to_pandas(*self._agg(_func, empty=empty))
[docs] def first(self, empty=None): """ Return the first event value according to the order of the provided collection's events dataframe. Parameters ---------- empty : scalar, string, or other pd.Series-compatible value, optional Value to use to fill when there is no matching events group and aggregation cannot be performed. If None, values will be filled with np.nan. """ def _func(arr, trace, **kwargs): # Identify the first potential match index = np.argmax(trace.mask, axis=1) # Identify where matches are made valid = trace.mask.any(axis=1).reshape(-1,1) # Fill empty value where no match is made res = np.where(valid, arr[index], self._get_empty(empty)) return res return self._to_pandas(*self._agg(_func, empty=empty))
[docs] def last(self, empty=None): """ Return the last event value according to the order of the provided collection's events dataframe. Parameters ---------- empty : scalar, string, or other pd.Series-compatible value, optional Value to use to fill when there is no matching events group and aggregation cannot be performed. If None, values will be filled with np.nan. """ def _func(arr, trace, **kwargs): # Identify the first potential match index = np.argmax(np.flip(trace.mask, axis=1), axis=1) # Identify where matches are made valid = trace.mask.any(axis=1).reshape(-1,1) # Fill empty value where no match is made res = np.where(valid, arr[index], self._get_empty(empty)) return res return self._to_pandas(*self._agg(_func, empty=empty))
[docs] def count(self, empty=None): """ Return the count of all intersected event values. Parameters ---------- empty : scalar, string, or other pd.Series-compatible value, optional Value to use to fill when there is no matching events group and aggregation cannot be performed. If None, values will be filled with np.nan. """ def _func(arr, trace, **kwargs): # Sum count of all intersecting events, apply to all columns res = np.tile(trace.mask.sum(axis=1), (self._ncols,1)).T return res return self._to_pandas(*self._agg(_func, empty=empty))
[docs] def sum(self, empty=None, nansum=False): """ Return the sum of all intersected event values. Parameters ---------- empty : scalar, string, or other pd.Series-compatible value, optional Value to use to fill when there is no matching events group and aggregation cannot be performed. If None, values will be filled with np.nan. """ def _func(arr, trace, **kwargs): # Choose all intersecting events if nansum: res = [np.nansum(arr[mask_i, :], axis=0).T for \ mask_i in trace.mask] else: res = [np.sum(arr[mask_i, :], axis=0).T for \ mask_i in trace.mask] return res return self._to_pandas(*self._agg(_func, empty=empty))
[docs] def sumproduct(self, empty=None, normalized=False, dropna=False): """ Return the sum of all event values multiplied by the weights of the intersecting events. If normalized=False, the event values will be multiplied by the actual overlapping length (e.g., multiplying a per- mile value by the miles of overlap). If normalized=True, the event values will be multiplied by the normalized overlapping length (e.g., multiplying a total value of an overlapped event by the proportion of the event which is overlapped). Parameters ---------- empty : scalar, string, or other pd.Series-compatible value, optional Value to use to fill when there is no matching events group and aggregation cannot be performed. If None, values will be filled with np.nan. normalized : boolean, default False Whether the weights of the intersecting events being multiplied with the event values should be normalized by the total length of the events being intersected. dropna : boolean, default False Whether to drop np.nan values before aggregating. """ def _func(arr, trace, **kwargs): # Prepare weights data congruent with array data weights = trace.weights weights = np.tile(np.expand_dims(weights, 2), (1,1,self._ncols)) if normalized: weights /= trace.group_right.lengths # Drop nan if requested, zeroing weights where nan values occur if dropna: weights = np.where(np.isnan(arr.astype(float)), 0, weights) # Compute sums res = np.multiply( arr.reshape(1,-1,self._ncols), weights).sum(axis=1) return res return self._to_pandas(*self._agg(_func, empty=empty))
[docs] def mean(self, empty=None, weighted=True, dropna=False): """ Return an overlay length-weighted average of all event values. An unweighted simple average can also be computed if weighted=True. Parameters ---------- empty : scalar, string, or other pd.Series-compatible value, optional Value to use to fill when there is no matching events group and aggregation cannot be performed. If None, values will be filled with np.nan. weighted : boolean, default True Whether the computed average should be weighted. If False, an un-weighted average will be computed, giving all intersecting values an equal weight. dropna : boolean, default False Whether to drop np.nan values before aggregating. """ def _func(arr, trace, **kwargs): # Determine event weights weights = trace.weights if weighted else trace.mask weights = np.tile(np.expand_dims(weights, 2), (1,1,self._ncols)) # Drop nan if requested, zeroing weights where nan values occur if dropna: weights = np.where(np.isnan(arr.astype(float)), 0, weights) # Compute means numer = np.multiply( arr.reshape(1,-1,self._ncols), weights).sum(axis=1) denom = weights.sum(axis=1) denom = np.where(denom==0, np.nan, denom) res = np.divide(numer, denom) return res return self._to_pandas(*self._agg(_func, empty=empty))
[docs] def most(self, empty=None, dropna=True): """ Return the event value associated with the greatest total overlay length, ignoring missing values by default. Parameters ---------- empty : scalar, string, or other pd.Series-compatible value, optional Value to use to fill when there is no matching events group and aggregation cannot be performed. If None, values will be filled with np.nan. dropna : boolean, default False Whether to drop np.nan values in intersecting events before aggregating. """ def _func(arr, trace, **kwargs): # Iterate over 2nd dimension res = [] weights = trace.weights for arr_i in arr.T: # Drop nan if requested if dropna: nanmask = ~pd.isna(arr_i) if nanmask.sum() == 0: res.append( np.full(weights.shape[0], self._get_empty(empty))) continue else: arr_i = arr_i[nanmask] weights_i = weights[:,nanmask] else: weights_i = weights # Aggregate and add to result res.append(get_most(arr_i, weights_i)) return np.array(res).T return self._to_pandas(*self._agg(_func, empty=empty))
[docs] def mode(self, empty=None): """ Return the most frequent unique event value. Parameters ---------- empty : scalar, string, or other pd.Series-compatible value, optional Value to use to fill when there is no matching events group and aggregation cannot be performed. If None, values will be filled with np.nan. """ def _func(arr, trace, **kwargs): # Iterate over all event masks res = [] for mask_i in trace.mask: # Iterate over columns res_i = [get_mode(arr_i) for arr_i in arr[mask_i, :].T] res.append(res_i) return np.array(res) return self._to_pandas(*self._agg(_func, empty=empty))
[docs] def value_counts(self, expand=True, dropna=True): """ Return a dataframe of all unique intersecting event values and their occurence counts. Parameters ---------- expand : bool, default True Whether to automatically expand the value counts data to a dataframe when a single column is being analyzed. """ def _func(arr, trace, **kwargs): # Find null values in array if dropna: nanmask = ~pd.isna(arr.T) # Iterate over all event masks res = [] for mask_i in trace.mask: # Iterate over columns res_i = [] for j in range(arr.shape[1]): # Remove NaN if dropna: arr_i = arr[mask_i & nanmask[j]] else: arr_i = arr[mask_i] # Get counts of unique values unique, counts = np.unique( arr_i, return_counts=True, equal_nan=True) # Summarize in dictionary res_i.append( {val: count for val, count in zip(unique, counts)}) res.append(res_i) return np.array(res) # Generate data and expand if requested index, data = self._agg(_func, empty={}) if self._ndim == 1: if expand: return pd.DataFrame(data.flatten().tolist(), index=index) else: return self._to_pandas(index, data) else: return self._to_pandas(index, data)
[docs] def cut(self, empty=None, return_mls=True): """ Cut intersecting event routes at the intersecting begin and end locations, returning the resulting route's geometry or the route itself if requested. Parameters ---------- empty : scalar, string, or other pd.Series-compatible value, optional Value to use to fill when there is no matching events group and aggregation cannot be performed. If None, values will be filled with np.nan. return_mls : bool, default True Whether to return the MultiLineString associated with each cut MLRRoute instead of the route itself. """ if self._ncols != 1: raise ValueError("EventsMergeAttribute must represent a single " "column to perform cut.") def _func(arr, trace, **kwargs): # Iterate over all event masks indices = np.argmax(trace.mask, axis=1) gen = zip(indices, trace.group_left.begs, trace.group_left.ends) res = [] for i, beg, end in gen: # Choose the first intersecting event and cut the route route = arr[i, 0] try: res_i = route.cut(beg, end) res_i = res_i.mls if return_mls else res_i res.append(res_i) except AttributeError: raise TypeError( "EventsMergeAttribute must represent a single column " "and must contain MLSRoute objects to be cut.") return res return self._to_pandas(*self._agg( _func, empty=empty, as_array=False), as_series=True)
[docs] def interpolate(self, snap=None, point='begs', empty=None, **kwargs): """ Interpolate along intersecting event routes at the intersecting location (or begin point for linear events), returning the resulting interpolated point geometry. Parameters ---------- snap : {None, 'near', 'left', 'right'}, default None If the event location does not fall within any geometry, snap to the nearest match based on distance, choosing the closest location to the left, right, or the nearest side ('near'). If None, a value error will be raised when no intersecting ranges are found. point : {'begs', 'ends', 'centers'}, default 'begs' Where on the intersecting events the point should be made, at the begin, end, or center point of the range. empty : scalar, string, or other pd.Series-compatible value, optional Value to use to fill when there is no matching events group and aggregation cannot be performed. If None, values will be filled with np.nan. """ if self._ncols != 1: raise ValueError("EventsMergeAttribute must represent a single " "column to perform interpolate.") def _func(arr, trace, **kwargs): # Iterate over all event masks indices = np.argmax(trace.mask, axis=1) gen = zip(indices, getattr(trace.group_left, point)) res = [] for i, loc in gen: # Choose the first intersecting event and interpolate the point route = arr[i, 0] try: res_i = route.interpolate(loc, snap=snap, **kwargs) res.append(res_i) except AttributeError: raise TypeError( "EventsMergeAttribute must represent a single " "column and must contain MLSRoute objects to be " "interpolated.") return res return self._to_pandas(*self._agg( _func, empty=empty, as_array=False), as_series=True)
[docs]class EventsMergeTrace(object): """ Object class for managing data on the relationship between two events collections that have been merged using the EventsMerge system. Traces contain a few main elements: group_left, group_right : pointers to the left and right events groups that are related. During aggregation, information in the right group will be aggregated and formed to the dataframe underlying the left group. key : the unique key associated with both events groups that produces their relationship. mask : a boolean array of shape (group_left.df.shape[0], group_right.df.shape[0]), i.e., a number of rows equal to the number of rows in the left events group and a number of columns equal to the number of rows in the right events group. This mask defines all instances where the left and right groups intersect based on their defined ranges and closed parameters. weights : a numeric array of shape (group_left.df.shape[0], group_right.df.shape[0]), i.e., a number of rows equal to the number of rows in the left events group and a number of columns equal to the number of rows in the right events group. This array defines the actual numeric length that is overlapped between the individual events in the left and right events groups. success : a boolean indicator of whether or not a valid relationship has been discovered between the left group and any right group. When False, no right group will be indicated. """ def __init__(self, group_left=None, group_right=None, key=None, mask=None, weights=None, success=True): self.group_left = group_left self.group_right = group_right self.key = key self.mask = mask self.weights = weights self.success = success
[docs]class EventsMerge(object): """ High-level object class for managing merges between two events collections and summarizing/retrieving information from these merges. Generated through collection-level merging methods such as EventsCollection.merge. """ def __init__(self, left, right): # Log parameters self.left = left self.right = right def __getitem__(self, column) -> EventsMergeAttribute: return EventsMergeAttribute(self, column) def __repr__(self): text = ( f"left: {self.left}\n" f"right: {self.right}\n" f"(traces {'built' if hasattr(self, '_traces') else 'not built'})" ) return text @property def left(self): return self._left @left.setter def left(self, obj): self._validate_target(obj) self._left = obj @property def right(self): return self._right @right.setter def right(self, obj): # Validate and set self._validate_target(obj, left=False) self._right = obj @property def traces(self): try: return self._traces except AttributeError: self.build(inplace=True) return self._traces @traces.setter def traces(self, obj): if not isinstance(obj, list): raise TypeError("Traces must be a list of EventsMerge_trace " "objects.") self._traces = obj @property def num_keys(self): return self.left.num_keys @property def keys(self): return self.left.keys @property def columns(self): return self.right.columns def _validate_target(self, obj, left=True): # Ensure left is set first if the target is the right if not (left) and not (hasattr(self, '_left')): raise AttributeError("The left target must be set before the right " "target.") # Ensure type if not isinstance(obj, EventsCollection): raise TypeError("EventsMerge targets must be EventsCollections.") # Ensure matching keys try: assert obj.num_keys == self.num_keys except AttributeError: pass except AssertionError: raise ValueError( "Input EventsMerge target must have the same number of keys " f"as the existing left target ({self.num_keys}).")
[docs] def copy(self, deep=False): """ Create an exact copy of the events class instance. Parameters ---------- deep : bool, default False Whether the created copy should be a deep copy. """ if deep: return copy.deepcopy(self) else: return copy.copy(self)
[docs] def build(self, inplace=True): """ Perform intersects and overlays to produce EventsMergeTrace objects for aggregation. """ # Iterate over EventsGroups in the left collection traces = [] for key, group_left in self.left.iter_groups(): # Get the associated EventsGroup from the right collection try: group_right = self.right.get_group(key, empty=False) except KeyError: # No matching group on the right? Log empty trace traces.append( EventsMergeTrace( group_left=group_left, key=key, success=False)) continue # Perform intersection mask = group_left.intersecting( other=group_right, squeeze=False, get_mask=True) weights = group_left.overlay( other=group_right, normalize=False, squeeze=False) # Aggregate results traces.append( EventsMergeTrace( group_left=group_left, group_right=group_right, key=key, mask=mask, weights=weights, success=True)) # Log traces self.traces = traces
[docs] def distribute(self, column=None, squeeze=True, **kwargs): """ Intersect and distribute events over the range collection, scaling their values relative to their indexed distance from their intersecting range location. Parameters ---------- column : pandas column label or list of same, optional The events dataframe column(s) containing the values associated with each event being analyzed. If not provided, all values will default to be 1. blur_size : int, default 0 The number of pixels to blur events across based on the blur style. blur_style : str or callable, default 'linear' The scaling function to be called at each blurring step to scale original values. If a callable is provided, it must accept a single integer input for the zero-indexed pixel number, returning a single float scaling value. Predefined blurring functions can be called using the following labels: Options ------- linear : linearly scale down values from the original value to zero at the first index outside the blurred pixel range norm_static : Tk norm_scale : Tk none : do not scale down original values length_normalize : bool, default True Normalize the intersection scores by the length of the range to account for differing range lengths. Created: 2022-10-04 """ # Iterate over EventsGroups in the left collection index = [] res = [] for key, group_left in self.left.iter_groups(): # Get the associated EventsGroup from the right collection group_right = self.right.get_group(key, empty=True) # Get event values if not column is None: if isinstance(column, list): values = group_right.df[column].values else: values = group_right.df[[column]].values else: values = None # Perform distribution res_i = group_left.rng.distribute( group_right.rng, values=None, **kwargs) # Aggregate results over all column values if not column is None: res_i = (np.expand_dims(res_i, 2) * values).sum(axis=1) else: res_i = res_i.sum(axis=1) # Log results index.append(group_left.df.index) res.append(res_i) # Synthesize into pandas object index = np.concatenate(index) data = np.concatenate(res) if column is None: obj = pd.Series(index=index, data=data, name=None) elif isinstance(column, list): if len(column)==1 and squeeze: obj = pd.Series(index=index, data=data, name=column[0]) else: obj = pd.DataFrame(index=index, data=data, columns=column) else: obj = pd.Series(index=index, data=data, name=column) return obj
[docs] def cut(self, **kwargs): """ Cut intersecting event routes at the intersecting begin and end locations, returning the resulting route's geometry or the route itself if requested. Parameters ---------- empty : scalar, string, or other pd.Series-compatible value, optional Value to use to fill when there are no intersecting events and aggregation cannot be performed. If None, values will be filled with np.nan. return_mls : bool, default True Whether to return the MultiLineString associated with each cut MLRRoute instead of the route itself. """ # Get attribute for routes if available try: ema = self[self.right.route] except: raise ValueError("Right collection does not contain a valid " "routes column label.") # Perform cut return ema.cut(**kwargs)
[docs] def interpolate(self, **kwargs): """ Interpolate along intersecting event routes at the intersecting location (or begin point for linear events), returning the resulting interpolated point geometry. Parameters ---------- snap : {None, 'near', 'left', 'right'}, default None If the event location does not fall within any geometry, snap to the nearest match based on distance, choosing the closest location to the left, right, or the nearest side ('near'). If None, a value error will be raised when no intersecting ranges are found. point : {'begs', 'ends', 'centers'}, default 'begs' Where on the intersecting events the point should be made, at the begin, end, or center point of the range. empty : scalar, string, or other pd.Series-compatible value, optional Value to use to fill when there are no intersecting events and aggregation cannot be performed. If None, values will be filled with np.nan. """ # Get attribute for routes if available try: ema = self[self.right.route] except: raise ValueError("Right collection does not contain a valid " "routes column label.") # Perform cut return ema.interpolate(**kwargs)
[docs] def count(self, **kwargs): """ Count the number of intersecting events. Parameters ---------- empty : scalar, string, or other pd.Series-compatible value, optional Value to use to fill when there is no matching events group and aggregation cannot be performed. If None, values will be filled with np.nan. """ # Get the first key attribute ema = self[self.right.keys[0]] # Perform aggregation return ema.count(**kwargs)
[docs] def any(self, **kwargs): """ Indicate whether each record intersects with at least one event. Parameters ---------- empty : scalar, string, or other pd.Series-compatible value, optional Value to use to fill when there is no matching events group and aggregation cannot be performed. If None, values will be filled with np.nan. """ # Get the first key attribute ema = self[self.right.keys[0]] # Perform aggregation return ema.any(**kwargs)
########### # HELPERS # ###########
[docs]def get_most(arr, weights): """ Select the item from the input 1D array which is associated with the highest total weight from each row in the 2D weights array. Scores are computed by summing the weights for each unique array value for each row of weights. When multiple values are tied, the first item in sorted order will be selected. """ # Group and split sorted target array sorter = np.argsort(arr) unique, splitter = np.unique(arr[sorter], return_index=True) splitter = splitter[1:] # Split weights and aggregate to select highest scoring values splits = np.split(weights[:, sorter], splitter, axis=1) select = unique[np.argmax([x.sum(axis=1) for x in splits], axis=0)] return select
[docs]def get_mode(arr): """ Select the item from the input array which appears most frequently. Parameters ---------- arr : array-like Array with target values """ # Enforce numpy array arr = np.asarray(arr) # Find most frequent unique value and return unique, counts = np.unique(arr, return_counts=True) return unique[np.argmax(counts)]
##################### # LATE DEPENDENCIES # ##################### from linref.events.collection import EventsCollection