"""
===============================================================================
Module featuring EventsCollection and EventsGroup object classes for the
management of linear referencing events data and optimized performance of
various events operations including dissolves, automated intersections
and attribute retrievals, linear overlays, and more.
EventsCollection class instances represent complex events data sets with
multiple groups of events which are distinguished by at least one set of keys
(e.g., years of data or inventory categories). These collections can be used
for a variety of linear referencing operations and events manipulations, such
as dissolves based on a subset of events columns, returning a simplified
data set with a selection of columns aggregated. Additionally, these
collections can be used to perform automated merges and intersections with
other EventsCollection class instances using the .merge() method, retrieving
column data from another collection and relating it to the original
collection's events data.
EventsGroup class instances represent simple events data sets with a single
group of contiguous events. These groups can be used for a variety of linear
referencing operations such as overlays to determine portions of events
overlapped by an input range, intersections to determine which events intersect
with an input range, length-weighted averages of event column values based on
an input range, and more.
EventsCollection class instances can be queried using square bracket indexing
or the .get_subset() and .get_group() methods, returning a pared down
EventsCollection or a specific EventsGroup, respectively. Similarly, this can
be done using object indexing, passing a mixture of unique values and valid
slices of unique key values to return a subset of the collection as an
EventsCollection instance, or just unique key values to return a unique group
as an EventsGroup instance.
Classes
-------
EventsCollection, EventsGroup
Dependencies
------------
pandas, geopandas, numpy, shapely, copy, warnings, rangel
Examples
--------
Create an events collection for a sample roadway events dataframe with unique
route identifier represented by the 'Route' column and data for multiple years,
represented by the 'Year' column. The begin and end mile points are defined by
the 'Begin' and 'End' columns.
>>> ec = EventsCollection(df, keys=['Route','Year'], beg='Begin', end='End')
To select events from a specific route and a specific year, indexing for all
keys can be used, producing an EventsGroup.
>>> eg = ec['Route 50', 2018]
To select events on all routes but only those from a specific year, indexing
for only some keys can be used.
>>> ec_2018 = ec[:, 2018]
To retrieve information from one events collection and apply it to the events
of the other.
>>> ec.merge()
To get all events which intersect with a numeric range, the intersecting()
method can be used on an EventsGroup instance.
>>> df_intersecting = eg.intersecting(0.5, 1.5, closed='left_mod')
The intersecting() method can also be used for point locations by ommitting the
second location attribute.
>>> df_intersecting = eg.intersecting(0.75, closed='both')
The linearly weighted average of one or more attributes can be obtained using
the overlay_average() method.
>>> df_overlay = eg.overlay_average(0.5, 1.5, cols=['Speed_Limit','Volume'])
If the events include information on the roadway speed limit and number of
lanes, they can be dissolved on these attributes. During the dissolve, other
attributes can be aggregated, providing a list of associated values or
performing an aggregation function over these values.
>>> ec_dissolved = ec.dissolve(attr=['Speed_Limit','Lanes'], aggs=['County'])
Development
-----------
Developed by:
Tariq Shihadah, tariq.shihadah@gmail.com
Created:
10/22/2019
Modified:
3/3/2021
===============================================================================
"""
################
# DEPENDENCIES #
################
import pandas as pd
import geopandas as gpd
import numpy as np
from shapely.geometry import LineString, MultiLineString, Point
from shapely.ops import linemerge
from linref.route import MLSRoute, combine_mpgs
from rangel import RangeCollection
import copy, warnings
##################
# EVENTS CLASSES #
##################
[docs]class EventsFrame(object):
"""
High-level class for managing linear events data. Users should instead use
the EventsCollection class for complex data sets with multiple groups of
events, grouped by at least one key column (e.g., route ID), or the
EventsGroup class for simple data sets with only a single group of events.
"""
# Default standard column values
default_keys = ['RID', 'YEAR', 'KEY']
default_beg = ['BMP', 'BEG', 'FROM', 'LOC']
default_end = ['EMP', 'END', 'TO']
default_geom = ['geometry']
def __init__(
self, df, keys=None, beg=None, end=None, geom=None, route=None,
closed=None, sort=False, **kwargs):
# Log input values
super(EventsFrame, self).__init__()
self._df = df
self.keys = keys
self.beg = beg
self.end = end
self.geom = geom
self.route = route
self.closed = closed
self._sort = sort
self.df = df
def __repr__(self):
# Define representation components
nm = f"{self.__class__.__name__}"
kwargs = ['df']
if self.num_keys > 0:
kwargs.append(f"""keys=['{"','".join(self.keys)}']""")
kwargs.append(f"beg='{self.beg}'")
kwargs.append(f"end='{self.end}'")
if not self.geom is None:
kwargs.append(f"geom='{self.geom}'")
kwargs.append(f"closed='{self.closed}'")
# Return text
text = f"{nm}({', '.join(kwargs)})"
return text
def __iter__(self):
return (self.get_group(key) for key in self.group_keys_unique)
@property
def df(self):
"""
The collection's events dataframe.
"""
return self._df
@df.setter
def df(self, df):
# Validate input
if isinstance(df, pd.DataFrame) or isinstance(df, gpd.GeoDataFrame):
# Sort dataframe
self._df = df
if self._sort:
self._df = self._sort_df(df)
# Define the key groups
if self.num_keys > 0:
self._groups = self._df.groupby(by=self.keys)
else:
self._groups = None
# Retrieve default geometry column if df is geodataframe
if isinstance(df, gpd.GeoDataFrame) and self.geom is None:
self.geom = df.geometry.name
# Reset logs
try:
self.log.reset()
except:
pass
self._initialize_df()
else:
raise TypeError(
"Input dataframe must be pandas DataFrame class instance.")
@property
def size(self):
"""
Return the size of the events dataframe.
"""
return self._df.size
@property
def shape(self):
"""
Return the shape of the events dataframe.
"""
return self._df.shape
def _initialize_df(self):
"""
Class-specific dataframe initialization processes.
"""
pass
def _sort_df(self, df):
"""
Sort the given dataframe by the collection's keys and begin/end
columns, returning the sorted dataframe.
"""
return df.sort_values(
by=self.keys + [self.beg, self.end], ascending=True)
[docs] def set_df(self, obj, inplace=False):
"""
Set a new events dataframe.
"""
# Define target, copy if needed
ef = self if inplace else self.copy()
# Assign dataframe
ef.df = obj
# Return if needed
if not inplace:
return ef
[docs] def sort(self, inplace=False):
"""
Sort the events dataframe based on target columns.
"""
# Create a copy if requested
ef = self if inplace else self.copy()
# Log sorting
ef._sort = True
ef.df = self.df
if not inplace:
return ef
[docs] def cast_gdf(self, inplace=False, **kwargs):
"""
Convert the events dataframe to a geodataframe, passing the input
keyword arguments, such as crs and geometry, to the gpd.GeoDataFrame
constructor. See documentation for this constructor for more
information.
"""
# Attempt to convert to geodataframe
gdf = gpd.GeoDataFrame(self.df, **kwargs)
# Log new geodataframe
if inplace:
self.df = gdf
return
else:
ef = self.copy()
ef.df = gdf
return ef
[docs] def df_exportable(self):
"""
Return a dataframe which is optimized for exporting.
"""
# Create a copy of the events dataframe
df = self.df.copy()
# Convert route data to wkt
try:
df[self.route] = \
df[self.route].apply(lambda x: x.wkt)
except:
# Remove route column
df = df.drop(columns=[self.route], errors='ignore')
return df
@property
def keys(self):
"""
The list of column names within the events dataframe which are queried
to define specific events groups (e.g., events on a specific route).
"""
return self._keys
@property
def key_locs(self):
return self._key_locs
@keys.setter
def keys(self, keys):
# Address null input
if keys is None:
keys = []
# Validate input type
elif isinstance(keys, str):
# If string, assume single column reference
keys = [keys]
else:
try:
# Validate list-like
keys = list(keys)
except TypeError:
raise TypeError("Input key column name(s) must be a string or "
"list-like of strings which refer to valid columns within "
"the collection's events dataframe.")
# Validate presence within events dataframe
for key in keys:
if not key in self.df.columns:
raise ValueError(f"Key column value '{key}' is not present "
"within the collection's events dataframe.")
# Log validated keys
self._keys = keys
self._key_locs = [self.columns.index(key) for key in keys]
@property
def num_keys(self):
"""
The number of key columns within self.keys.
"""
return len(self.keys)
@property
def key_values(self):
"""
A dictionary of valid values for each key column.
"""
# Identify all unique values for each key
values = {col:self.df[col].unique() for col in self.keys}
return values
@property
def columns(self):
"""
A list of all columns within the events dataframe.
"""
return self._df.columns.values.tolist()
@property
def targets(self):
"""
A list of begin, end, and key columns within the events dataframe.
"""
# Define target columns
targets = [self.beg, self.end] + self.keys
return targets
@property
def others(self):
"""
A list of columns within the events dataframe which are not the begin,
end, or key columns.
"""
# Define other columns
others = [col for col in self.df.columns if not col in self.targets]
return others
@property
def groups(self):
"""
The pandas GroupBy of the events dataframe, grouped by the collection's
key columns. This defines the basis for key queries.
"""
return self._groups
@property
def group_keys(self):
return list(map(tuple, self.df.values[:, self.key_locs]))
@property
def group_keys_unique(self):
return list(set(map(tuple, self.df.values[:, self.key_locs])))
@property
def beg(self):
return self._beg
@property
def beg_loc(self):
return self._beg_loc
@property
def begs(self):
return self.df.values[:, self.beg_loc]
@beg.setter
def beg(self, beg):
# Address null input
if beg is None:
raise ValueError("Begin location column cannot be None.")
# Validate presence within events dataframe
elif not beg in self.df.columns:
raise ValueError(f"Begin location column name '{beg}' is not "
"present within the collection's events dataframe.")
# Log validated keys
self._beg = beg
self._beg_loc = self.columns.index(beg)
@property
def end(self):
return self._end
@property
def end_loc(self):
return self._end_loc
@property
def ends(self):
return self.df.values[:, self.end_loc]
@end.setter
def end(self, end):
# Address null input
if end is None:
end = self.beg
# Validate presence within events dataframe
elif not end in self.df.columns:
raise ValueError(f"End location column name '{end}' is not "
"present within the collection's events dataframe.")
# Log validated keys
self._end = end
self._end_loc = self.columns.index(end)
@property
def geom(self):
return self._geom
@property
def geom_loc(self):
return self._geom_loc
@geom.setter
def geom(self, geom):
# Address null input
if geom is None:
pass
# Validate presence within events dataframe
elif not geom in self.df.columns:
raise ValueError(f"Geometry column name '{geom}' is not "
"present within the collection's events dataframe.")
# Log validated keys
self._geom = geom
self._geom_loc = self.columns.index(geom) if not geom is None else None
@property
def is_point(self):
"""
Returns True if the collection's beg and end columns are the same,
implying that it is a collection of point events.
"""
return self._beg == self._end
@property
def route(self):
return self._route
@property
def route_loc(self):
return self._route_loc
@route.setter
def route(self, route):
# Address null input
if route is None:
pass
# Validate presence within events dataframe
elif not route in self.df.columns:
raise ValueError(f"Geometry column name '{route}' is not "
"present within the collection's events dataframe.")
# Log validated keys
self._route = route
self._route_loc = self.columns.index(route) \
if not route is None else None
[docs] def parse_routes(self, col=None, inplace=False, errors='raise'):
"""
Parse MLSRoutes data in the provided column, which contains either
MLSRoute objects, WKT data for MULTILINESTRINGs or LINESTRINGs with
M-values, or a mixture of both.
Parameters
----------
col : label, optional
A valid column label within the events dataframe which contains the
target MLSRoute data. If not provided, will attempt to retrieve a
previously assigned column label from the self.route property.
inplace : boolean, default False
Whether to perform the operation in place. If False, will return a
modified copy of the events object.
errors : {'raise','ignore'}
How to address errors which arise when coercing MLSRoute data
during processing. If ignored, errors will result in null values
in the events dataframe where errors occurred.
"""
# Check column
if col is None:
try:
col = self._route
except:
raise ValueError("No route column label provided.")
# Coerce data
def _to_routes(x):
if isinstance(x, MLSRoute):
return x
elif isinstance(x, str):
try:
return MLSRoute.from_wkt(x)
except Exception as e:
if errors=='raise':
raise e
else:
return
else:
if errors=='raise':
raise TypeError(
"Route data must be MLSRoute object or WKT valid "
"string.")
else:
return
routes = self.df[col].apply(_to_routes)
# Apply update
if inplace:
self.df[col] = routes
self.route = col
return
else:
ec = self.copy(deep=True)
ec.df[col] = routes
ec.route = col
return ec
@property
def closed(self):
"""
Collection parameter for whether event intervals are closed on the
left-side, right-side, both or neither.
"""
return self._closed
@closed.setter
def closed(self, closed):
self.set_closed(closed, inplace=True)
@property
def shape(self):
return self.df.shape
def _validate_cols(self, cols=None, require=False):
"""
Process input columns as list, string, or None, converting to list.
"""
# Validate column inputs and coerce list type
if cols is None:
if require:
raise ValueError("Must provide at least one column label.")
else:
cols = []
elif isinstance(cols, tuple) or isinstance(cols, list):
cols = list(cols)
else:
cols = [cols]
# Check presence in dataframe
try:
# Check for presence in events dataframe
for col in cols:
assert col in self.df.columns
except ValueError:
raise ValueError("Provided column labels must exist within the "
"events dataframe.")
except AssertionError:
raise ValueError(f"Column '{col}' does not exist within the "
"events dataframe.")
# Return validated columns
return cols
[docs] def iter_groups(self):
"""
Return an iterator which will iterate through all groups in the
collection, yielding each group's key as well as the associated
EventsGroup.
"""
return ((key, self.get_group(key)) for key in self.group_keys_unique)
[docs] def build_routes(self, label='route', errors='raise'):
"""
Build MLSRoute instances for each event based on available geometry
and begin and end locations.
Parameters
----------
label : valid pandas column label
Column label to use for newly generated column populated with
routes data.
errors : {'raise','ignore'}
How to address errors if they arise when producing routes. If
errors are not raised, inviable records in the new column will
be filled with np.nan.
"""
# Validate
if self.geom is None:
raise ValueError("No geometry column label defined.")
# Build routes
locs = (self.beg_loc, self.end_loc, self.geom_loc)
routes = []
for beg, end, geom in self.df.values[:, locs]:
try:
routes.append(MLSRoute.from_lines(geom, beg, end))
except Exception as e:
if errors=='ignore':
routes.append(np.nan)
else:
raise e
self.df[label] = routes
self._route = label
[docs] def copy(self, deep=False):
"""
Create an exact copy of the events class instance.
Parameters
----------
deep : bool, default False
Whether the created copy should be a deep copy.
"""
if deep:
return copy.deepcopy(self)
else:
return copy.copy(self)
[docs] def set_closed(self, closed=None, inplace=False):
"""
Change whether ranges are closed on left, right, both, or neither side.
Parameters
----------
closed : str {'left', 'left_mod', 'right', 'right_mod', 'both',
'neither'}, optional
Whether intervals are closed on the left-side, right-side, both or
neither. If None, will default to 'left_mod' for linear events and
'both' for point events.
inplace : boolean, default False
Whether to perform the operation in place on the parent range
collection, returning None.
"""
# Ensure valid option selected
if closed is None:
if self.is_point:
closed = 'both'
else:
closed = 'left_mod'
elif not closed in RangeCollection._ops_closed:
raise ValueError(
"Closed parameter must be one of "
f"{RangeCollection._ops_closed}.")
# Apply parameter
if inplace:
self._closed = closed
else:
ec = self.copy()
ec._closed = closed
return ec
[docs] def geometry_from_xy(self, x, y, col_name='geometry', crs=None,
inplace=False):
"""
Use X and Y coordinates in the events dataframe to generate point
geometry.
"""
# Validate columns
x, y = self._validate_cols(cols=[x, y])
# Generate geometry
geometry = self.df.apply(lambda r: Point(r[x], r[y]), axis=1)
# Apply geometry
if inplace:
ef = self
else:
ef = self.copy()
ef.df[col_name] = geometry
ef.df = gpd.GeoDataFrame(ef.df, geometry=col_name, crs=crs)
ef.geom = col_name
return None if inplace else ef
[docs] def dissolve(self, attr=None, aggs=None, agg_func=None, agg_suffix='_agg',
agg_geometry=False, agg_routes=False, dropna=False, fillna=None,
reorder=True, merge_lines=True):
"""
Dissolve the events dataframe on a selection of event attributes.
Note: Data will be sorted by keys and begin/end columns prior to
performing the dissolve.
Note: Missing data in selected attribute fields may cause problems with
dissolving; please use df.fillna(...) or df.dropna(...) to avoid this
problem.
Parameters
----------
attr : str or list
Which event attribute(s) within the events dataframe to dissolve
on.
aggs : str or list, default None
Which event attribute(s) within the events dataframe to aggregate
during the dissolve. Attributes will be aggregated into a list
and returned under the same attribute name.
agg_func : callable function or list of callable functions, default None
A function or list of functions corresponding to the list of
aggregation attributes which will be called on the list-aggregated
contents of those attributes.
agg_suffix : str or list, default '_agg'
A suffix to be added to the name of aggregated columns. If provided
as a list, must correspond to provided lost of aggregation
attributes.
agg_geometry : bool, default False
Whether to create an aggregated geometries field, populated with
aggregated shapely geometries based on those contained in the
collection's geometry field.
agg_routes : bool, default False
Whether to create an aggregated routes field, populated with
MLSRoute object class instances, created based on aggregated
segment geometries and begin and end mile posts.
dropna : bool, default False
Whether to drop records with empty values in the attribute fields.
This parameter is passed to the df.groupby call.
fillna : optional
A value or dictionary used to fill instances of np.nan in the
target dataframe. Consistent with the DataFrame.fillna() method.
reorder : bool, default True
Whether to reorder the resulting dataframe columns to match the
order of the collection's events dataframe.
merge_lines : bool, default True
Whether to use shapely's ops.linemerge function to combine
contiguous linestrings when aggregating linear geometries. Only
applicable when agg_geometry=True.
"""
# Validate inputs
# - Create, sort dummy dataframe
df = self._sort_df(self.df.copy())
col_order = list(df.columns)
df['__DUMMY__'] = True # Dummy data guarantees >0 groupby keys
# - Dissolve attributes
attr = ['__DUMMY__'] + self.keys + self._validate_cols(attr)
aggs = self._validate_cols(aggs)
# - Aggregation functions
if agg_func is None:
agg_func = [None for i in aggs]
elif callable(agg_func):
agg_func = [agg_func for i in aggs]
elif type(agg_func) is list:
if not len(agg_func) == len(aggs):
raise ValueError("Aggregation functions must be "
"provided as a single callable function or a list of "
"functions the same length as the list of aggregation "
"attributes.")
else:
raise ValueError("Aggregation functions must be provided as "
"a single callable function or a list of functions the "
"same length as the list of aggregation attributes.")
# - Aggregation suffixes
if agg_suffix is None:
agg_suffix = ['' for i in aggs]
elif type(agg_suffix) is str:
agg_suffix = [agg_suffix for i in aggs]
elif type(agg_suffix) is list:
if not len(agg_suffix) == len(aggs):
raise ValueError("Aggregation suffixes must be provided as a "
"single string or a list of strings the same length as "
"the list of aggregation attributes.")
else:
raise ValueError("Aggregation suffixes must be provided as a "
"single string or a list of strings the same length as the "
"list of aggregation attributes.")
# Additional aggregation requests
# - Prepare geometry dissolve if requested
if agg_geometry:
# Confirm valid geometry field
if self.geom is None:
raise ValueError("Collection does not include an identified "
"geometry field. This must be provided at initialization "
"of the collection or by setting it directly.")
# Create geometry aggregation function
if merge_lines:
func = lambda x: linemerge(combine_mpgs(x, cls=MultiLineString))
else:
func = lambda x: combine_mpgs(x, cls=MultiLineString)
# Append routes field name to aggregation list
aggs.append(self.geom)
agg_func.append(func)
agg_suffix.append('')
# - Prepare route dissolve if requested
if agg_routes:
# Confirm valid geometry field
if self.geom is None:
raise ValueError("Collection does not include an identified "
"geometry field. This must be provided at initialization "
"of the collection or by setting it directly.")
# Create route information feed
route_feed_col = 'route'
build_feed = lambda r: \
(r[self.geom], r[self.beg], r[self.end])
df[route_feed_col] = df.apply(build_feed, axis=1)
# Create route aggregation function
func = lambda x: MLSRoute.from_lines(*list(zip(*x)))
# Append routes field name to aggregation list
aggs.append(route_feed_col)
agg_func.append(func)
agg_suffix.append('')
# Prepare for dissolve
# - Process selected columns for valid groupby
select_cols = [self.beg, self.end]
select_cols += [x for x in attr if not x in select_cols]
select_cols += [x for x in aggs if not x in select_cols]
df = df[select_cols]
df = df.fillna(fillna) if not fillna is None else df
# - Group events data
grouped = df.groupby(by=attr, dropna=dropna) \
[[self.beg, self.end] + aggs].agg(list)
beg_groups = grouped[self.beg].to_list()
end_groups = grouped[self.end].to_list()
agg_groups = grouped[aggs] if len(aggs) > 0 else None
# Iterate through groups of data and define new dataframe records
records = []
for index, begs_i, ends_i in zip(grouped.index, beg_groups, end_groups):
# Identify breaks between consecutive events
rc = RangeCollection(begs=begs_i, ends=ends_i, centers=None,
copy=False, sort=False)
consecutive = rc.are_consecutive(all_=False, when_one=True)
splitter = (np.where(np.invert(consecutive))[0] + 1).tolist()
# Get aggregation data
lin_ranges = np.split(np.stack([begs_i, ends_i]), splitter, axis=1)
if not agg_groups is None:
try:
agg_data = agg_groups.loc[index, :].to_list()
agg_ranges = [[agg[i:j] for agg in agg_data] for i,j in \
zip([None]+splitter, splitter+[None])]
except KeyError:
raise KeyError(
f"Unable to retrieve data group with index {index}. "
"This may be due to nan data in one or more of the "
"dissolving attributes.")
else:
agg_ranges = iter(list, 1)
# Enforce grouped index as a list
index = list(index) if len(attr) > 1 else [index]
# Iterate over ranges and store data in records
for lin_range, agg_range in zip(lin_ranges, agg_ranges):
records.append([lin_range[0].min(), lin_range[1].max()] \
+ index[1:] + agg_range) # Remove dummy column data
# Create new dataframe with dissolved results
aggs = [agg + suf for agg, suf in zip(aggs, agg_suffix)]
res_cols = [self.beg, self.end] + attr[1:] + aggs # Remove dummy column
res = pd.DataFrame.from_records(data=records, columns=res_cols)
# Apply aggregation functions if requested
if not agg_func is None:
for col, func in zip(aggs, agg_func):
if not func is None:
res.loc[:,col] = res.loc[:,col].apply(func)
# Reorder columns and records
if reorder:
col_order = [c for c in col_order if c in res.columns] + \
[c for c in res.columns if not c in col_order]
res = res[col_order]
res = res.sort_values(by=self.keys+[self.beg,self.end],
axis=0, ascending=True)
# Convert to geodataframe if geometry is aggregated
if agg_geometry:
res = gpd.GeoDataFrame(res, geometry=self.geom, crs=self.df.crs)
# Generate events collection
ec = EventsCollection(res, keys=self.keys, beg=self.beg, end=self.end,
geom=self.geom if agg_geometry else None,
route='route' if agg_routes else None,
closed=self.closed, missing_data='ignore')
return ec
[docs] def project(self, other, buffer=100, nearest=True, loc_label='LOC',
dist_label='DISTANCE', build_routes=True, **kwargs):
"""
Project an input geodataframe onto the events dataframe, producing
linearly referenced point locations relative to events for all input
geometries within a buffered search area.
Parameters
----------
other : gpd.GeoDataFrame
Geodataframe containing geometry which will be projected onto the
events dataframe.
buffer : float, default 100
The max distance to search for input geometries to project against
the events' geometries. Measured in terms of the geometries'
coordinate reference system.
nearest : bool, default True
Whether to choose only the nearest match within the defined buffer.
If False, all matches will be returned. If True, when multiple
equidistant points exist, choose the first result that appears.
loc_label, dist_label : label
Labels to be used for created columns for projected locations on
target events groups and nearest point distances between target
geometries and events geometries.
build_routes : bool, default True
Whether to automatically build routes using the build_routes()
method if routes are not already available.
**kwargs
Keyword arguments to be passed to the EventsFrame constructor
upon completion of the projection.
"""
# Validate input geodataframe
if not isinstance(other, gpd.GeoDataFrame):
raise TypeError("Other object must be gpd.GeoDataFrame instance.")
else:
try:
other_geometry = other.geometry.name
except AttributeError:
raise AttributeError(
"No geometry data set in other geodataframe.")
other = other.copy()
# Check for invalid column names
if (self.route in other.columns):
raise ValueError(
f"Invalid column name '{self.route}' found in target "
"geodataframe.")
if len(set(self.keys) & set(other.columns)) > 0:
invalid = set(self.keys) & set(other.columns)
raise ValueError(
f"Target geodataframe contains at least one events collection "
f"key column name {invalid}.")
# Ensure that geometries and routes are available
if self.geom is None:
raise ValueError(
"No geometry found in events dataframe. If valid shapely "
"geometries are available in the dataframe, set this with the "
f"{self.__class__.__name__}'s geom property.")
elif self.route is None:
if build_routes:
self.build_routes()
else:
raise ValueError(
"No routes found in events dataframe. If valid shapely "
"geometries are available in the dataframe, create routes "
"by calling the build_routes() method on the "
f"{self.__class__.__name__} class instance.")
# Join the other geodataframe to this one
select_cols = self.keys + [self.route, self.geom]
if nearest:
joined = other.sjoin_nearest(
self.df[select_cols],
max_distance=buffer,
how='left'
)
# Drop duplicates (required for equidistant ties)
joined = joined[~joined.index.duplicated(keep='first')]
else:
# Buffer geometry for spatial join
buffered_geoms = self.df.geometry.buffer(buffer)
joined = other.sjoin(
self.df[select_cols].set_geometry(buffered_geoms),
how='left'
)
# Project input geometries onto event geometries
def _project(r):
try:
return r[self.route].project(r[other_geometry])
except AttributeError:
return
locs = joined.apply(_project, axis=1)
joined[loc_label] = locs
# return joined # modified to return EC 7/27/2022
# Prepare and return data
return self.__class__(
joined.drop(columns=[self.route]),
keys=self.keys,
beg=loc_label,
closed=self.closed,
missing_data='ignore',
**kwargs
)
[docs] def to_grid(self, dissolve=False, **kwargs):
"""
Use the events dataframe to create a grid of zero-length, equidistant
point events which span the bounds of each event.
Parameters
----------
length : numerical, default 1.0
A fixed distance between each point on the grid.
fill : {'none','cut','extend','right','balance'}, default 'cut'
How to fill a gap at the end of an event's range.
Options
-------
none : no point will be generated at the end of the input range
unless it falls directly on the defined grid distance.
cut : a point will be generated at the very end of the input range,
at a distance less than or equal to the defined grid distance.
right : the final point will be generated at a distance equal to
the defined grid distance, even if this extends beyond the full
input range.
extend : a point will be generated at the very end of the input
range, at a distance greater than or equal to the defined grid
distance.
balance : if the final range is greater than or equal to half the
target range length, perform the cut method; if it is less,
perform the extend method.
dissolve : bool, default False
Whether to dissolve the events dataframe before performing the
transformation.
"""
# Dissolve events
if dissolve:
events = self.dissolve().df
else:
events = self.df
# Iterate over roads and create sliding window segments
gen = zip(
events[self.keys + [self.beg, self.end]].values,
events.index.values
)
grid = []
for (*keys, beg, end), index in gen:
# Build grid points
rng = RangeCollection.from_steps(beg, end, **kwargs).cut(beg, end)
locs = np.append(rng.begs, rng.ends[-1])
num_locs = len(locs)
# Assemble sliding window data
grid.append(
np.concatenate(
[
[keys]*num_locs, # Event keys
np.tile(locs, (2,1)).T, # Point locations
[[index]]*num_locs # Parent index value
],
axis=1
)
)
# Merge and prepare data, return
grid = np.concatenate(grid, axis=0)
df = pd.DataFrame(
data=grid,
columns=self.keys + [self.beg, self.end, 'index_parent'],
index=None,
)
# Enforce data types
dtypes = {
**events.dtypes,
'index_parent': events.index.dtype
}
dtypes = {col: dtypes[col] for col in df.columns}
df = df.astype(dtypes, copy=False)
res = self.__class__(
df,
keys=self.keys,
beg=self.beg,
end=self.end,
missing_data='ignore'
)
return res
[docs] def to_windows(self, dissolve=False, endpoint=False, **kwargs):
"""
Use the events dataframe to create sliding window events of a fixed
length and a fixed number of steps, and which fill the bounds of each
event.
Parameters
----------
length : numerical, default 1.0
A fixed length for all windows being defined.
steps : int, default 1
A number of steps per window length. The resulting step length will
be equal to length / steps. For non-overlapped windows, use a steps
value of 1.
fill : {'none','cut','extend','left','right','balance'}, default 'cut'
How to fill a gap at the end of an event's range.
Options
-------
none : no window will be generated to fill the gap at the end of
the input range.
cut : a truncated window will be created to fill the gap with a
length less than the full window length.
extend : the final window will be anchored on the grid defined by
the step value, extending beyond the window length to the right
bound of the event.
left : the final window will be anchored on the end of the input
range and will extend the full window length to the left.
right : the final window will be anchored on the grid defined by
the step value, extending the full window length to the right,
beyond the event's end value.
balance : if the final range is greater than or equal to half the
target range length, perform the cut method; if it is less,
perform the extend method.
dissolve : bool, default False
Whether to dissolve the events dataframe before performing the
transformation.
endpoint : bool, default False
Add a point event at the end of each event range.
"""
# Dissolve events
if dissolve:
events = self.dissolve().df
else:
events = self.df
# Iterate over roads and create sliding window segments
gen = zip(
events[self.keys + [self.beg, self.end]].values,
events.index.values
)
windows = []
for (*keys, beg, end), index in gen:
# Build sliding window ranges
rng = RangeCollection.from_steps(beg, end, **kwargs).cut(beg, end)
if endpoint:
rng = rng.append(end, end)
# Assemble sliding window data
windows.append(
np.concatenate(
[
[keys]*rng.num_ranges, # Event keys
rng.rng.T, # Window bounds
[[index]]*rng.num_ranges # Parent index value
],
axis=1
)
)
# Merge and prepare data, return
windows = np.concatenate(windows, axis=0)
df = pd.DataFrame(
data=windows,
columns=self.keys + [self.beg, self.end, 'index_parent'],
index=None,
)
# Enforce data types
dtypes = {
**events.dtypes,
'index_parent': events.index.dtype
}
dtypes = {col: dtypes[col] for col in df.columns}
df = df.astype(dtypes, copy=False)
res = self.__class__(
df,
keys=self.keys,
beg=self.beg,
end=self.end,
missing_data='ignore'
)
return res
[docs]class EventsLog(object):
"""
High-level class for logging and managing child EventsGroups created within
the context of a parent EventsCollection class instance.
"""
def __init__(self, **kwargs):
# Log input values
super(EventsLog, self).__init__(**kwargs)
self.reset()
def __getitem__(self, key):
try:
return self._data[key]
except KeyError as e:
raise e
def __setitem__(self, key, obj):
self.log(key, obj, overwrite=True)
@property
def data(self):
return self._data
@property
def keys(self):
return list(self._data.keys())
[docs] def reset(self):
self._data = {}
[docs] def log(self, key, obj, overwrite=True):
"""
Store the input events class instance within the log's data under the
provided key.
"""
if overwrite:
self.data[key] = obj
else:
if key in self.data.keys():
raise ValueError(f"Provided key '{key}' already exists within "
"the log.")
else:
self.data[key] = obj
[docs]class EventsGroup(EventsFrame):
"""
User-level class for managing linear and points events data. This class is
used for simple data sets with only a single group of events. Data is
managed using both the pandas tabular data package as well as the ranges
range data package.
EventsGroup class isntances can be used for a variety of linear referencing
operations such as overlays to determine portions of events overlapped by
an input range, intersections to determine which events intersect with an
input range, length-weighted averages of event column values based on an
input range, and more.
Parameters
----------
df : pd.DataFrame
Pandas dataframe which contains linear or point events data.
beg, end : str or label
Column labels within the events dataframe which represent the linearly
referenced location of each event. For linear events both are required,
defining the begin and end location of each event. For point events,
only 'beg' is required, defining the exact location of each event (the
'end' property will automatically be set to be equal to the 'beg'
property).
geom : str or label, optional
Column label within the events dataframe which represents the shapely
geometry associated with each event if available. If provided,
certain additional class functionalities will be made available.
closed : str {'left', 'left_mod', 'right', 'right_mod', 'both',
'neither'}, optional
Whether intervals are closed on the left-side, right-side, both or
neither. If None, will default to 'left_mod' for linear events and
'both' for point events.
Options
-------
left : ranges are always closed on the left and never closed on the
right.
left_mod : ranges are always closed on the left and only closed on the
right when the next range is not consecutive.
right : ranges are always closed on the right and never closed on the
right.
right_mod : ranges are always closed on the right and only closed on
the left when the previous range is not consecutive.
both : ranges are always closed on both sides
neither : ranges are never closed on either side
"""
def __init__(self, df, beg=None, end=None, geom=None, closed=None,
**kwargs):
# Initialize EventsFrame superclass
super(EventsGroup, self).__init__(
df=df, keys=None, beg=beg, end=end, geom=geom, **kwargs)
# Build data
self._build_rng()
# Log input values
self.closed = closed
def __getitem__(self, keys):
"""
Select from the EventsGroup instance with numerical index values or a
boolean mask. Note: selection keys must be compatible with
np.ndarray[], rangel.RangeCollection[], and pd.DataFrame.loc[]
functionality.
"""
# Retrieve selection and return new EventsGroup
@property
def rng(self):
return self._rng
@rng.setter
def rng(self, rng):
# Validate input
if isinstance(rng, RangeCollection):
self._rng = rng
else:
raise TypeError("Input rng must be RangeCollection class "
"instance.")
@property
def lengths(self):
"""
Lengths of all event ranges.
"""
return self.rng.lengths
@property
def centers(self):
"""
Centers of all event ranges.
"""
return self.rng.centers
@property
def shape(self):
return self.df.shape
def _build_rng(self):
# Build range collection
rng = RangeCollection.from_array(
self.df[[self.beg,self.end]].values, closed=self.closed, sort=False)
self.rng = rng
[docs] def set_closed(self, closed, inplace=False):
"""
Change whether ranges are closed on left, right, both, or neither side.
Parameters
----------
closed : str {'left', 'left_mod', 'right', 'right_mod', 'both',
'neither'}, default 'left'
Whether intervals are closed on the left-side, right-side, both or
neither.
Options
-------
left : ranges are always closed on the left and never closed on the
right.
left_mod : ranges are always closed on the left and only closed on
the right when the next range is not consecutive.
right : ranges are always closed on the right and never closed on
the right.
right_mod : ranges are always closed on the right and only closed
on the left when the previous range is not consecutive.
both : ranges are always closed on both sides
neither : ranges are never closed on either side
inplace : boolean, default False
Whether to perform the operation in place on the parent range
collection, returning None.
"""
# Call super method
super(EventsGroup, self).set_closed(closed=closed, inplace=inplace)
try:
self.rng.set_closed(closed=closed, inplace=inplace)
except AttributeError:
pass
[docs] def intersecting(self, beg=None, end=None, other=None, closed='both',
get_mask=False, **kwargs):
"""
Retrieve a selection of records from the group of events based
on provided begin and end locations.
Parameters
----------
beg, end : numerical or array-like, optional
The begin and end locations of the range or ranges to be tested. If
a single range is to be tested, provide a numeric value. If
multiple, provide an array-like with a single begin and end value
for each range. If no end parameter provided, point locations will
be assumed and end will be set equal to beg. Not required if other
parameter is used.
other : EventsGroup, optional
Other EventsGroup instance to be intersected with this one. Can
be provided instead of beg, end, and closed parameters and will
take precedence over other input.
closed : str {'left', 'right', 'both', 'neither'}, default 'both'
Whether input interval is closed on the left-side, right-side, both
or neither.
Options
-------
left : ranges are always closed on the left and never closed on the
right.
right : ranges are always closed on the right and never closed on
the right.
both : ranges are always closed on both sides
neither : ranges are never closed on either side
get_mask : bool, default False
Whether to return a boolean mask for selecting from the events
dataframe instead of the selection from the dataframe itself.
"""
# Deprecation
get_mask = kwargs.get('mask', get_mask)
# Check for other input
if not other is None:
if not isinstance(other, EventsGroup):
raise TypeError(
"If provided, input other parameter must be valid "
"EventsGroup instance.")
other = other.rng
# Intersect range
mask = self.rng.intersecting(
beg=beg, end=end, other=other, closed=closed, **kwargs)
if get_mask:
return mask
else:
if mask.ndim > 1:
mask = mask.any(axis=1)
df = self.df.loc[mask, :]
return df
[docs] def overlay(self, beg=None, end=None, other=None, **kwargs):
"""
Compute overlap of the input bounds with respect to the
events group.
Parameters
----------
beg, end : scalar or array of scalars
Begin and end locations of the overlaid range(s).
other : EventsGroup, optional
Other EventsGroup instance to be overlaid with this one. Can be
provided instead of beg and end parameters and will take precedence
over other input.
normalize : boolean, default True
Whether overlapping lengths should be normalized range length to
give a proportional result.
how : {'right','left','sum'}, default 'right'
How overlapping lengths should be normalized. Only applied when
normalize=True.
right : Normalize overlaps by the length of each provided overlay
range.
left : Normalize overlaps by the length of each of the collection's
ranges being overlaid.
sum : Normalize overlaps by the sum of the lengths of all overlaps
for each provided overlay range. If there are gaps in the
collection's ranges or overlaps between the collection's
ranges, this will allow the sum of the overlaps to still equal
1.0, except where no overlaps occur.
norm_zero : float, optional
A number to substitute for instances where the normalizing factor
(denominator) is equal to zero, e.g., when the overlay range has a
length of zero and how='right'. If not provided, all instances of
zero division will return float value 0.0.
"""
# Check for other input
if not other is None:
if not isinstance(other, EventsGroup):
raise TypeError(
"If provided, input other parameter must be valid "
"EventsGroup instance.")
beg = other.rng.begs
end = other.rng.ends
# Compute range overlaps
weights = self.rng.overlay(beg=beg, end=end, **kwargs)
return weights
[docs] def overlay_average(self, beg=None, end=None, cols=None, weighted=True,
zeroweight=None, how='right', weights=None,
suffix='_average', **kwargs):
"""
Compute the weighted average of a selection of events columns based on
the overlap of the input bounds with respect to linear events.
Parameters
----------
beg : float
Beginning milepost of the overlaid segment.
end : float
Ending milepost of the overlaid segment.
cols : list
List of column labels to aggregate.
weighted : boolean, default True
Whether the computed average should be weighted. If False, an
un-weighted average will be computed, giving all intersecting
values an equal weight.
zeroweight : default None
If weights sum to zero, how to compute average. If None, an
un-weighted average will be computed. Else, no average will be
computed and the input value will be returned instead.
how : {'right','left','sum'}, default 'right'
How overlapping lengths should be normalized. Only applied when
normalize=True.
Options
-------
right : Normalize overlaps by the length of each provided overlay
range.
left : Normalize overlaps by the length of each of the collection's
event ranges.
sum : Normalize overlaps by the sum of the lengths of all overlaps
for each provided overlay range. If there are gaps in the
collection's event ranges or overlaps between the collection's
ranges, this will allow the sum of the overlaps to still equal
1.0, except where no overlaps occur.
weights : np.ndarray
An array of length-normalized overlay weights; if excluded,
weights will be computed based on given mileposts and parameters;
if multiple overlay computations are being conducted, computing
the weights separately and then inputting them directly into the
aggregation functions will produce time savings.
"""
# Validate inputs
cols = self._validate_cols(cols=cols, require=True)
# Compute weights
if weights is None and weighted:
weights = self.overlay(beg, end, normalize=True,
how=how, **kwargs).values
elif weights is None and not weighted:
weights = self.is_intersecting(beg, end, any_=False) * 1
# Aggregate selected columns
res = []
for col in cols:
vals = self.df[col].values
if len(vals) == 0:
avg = np.nan
# If weights are available, calculate weighted average
elif len(weights) > 0 and weights.sum() > 0:
avg = (vals * weights).sum()
# If weights are not available, use substitute
else:
if zeroweight is None:
avg = vals.sum() / len(vals)
else:
avg = zeroweight
# Log computed averages
res.append(avg)
# Process results
if len(cols) == 1:
return res[0]
else:
return pd.Series(data=res, index=[str(col)+suffix for col in cols])
[docs] def overlay_sum(self, beg=None, end=None, cols=None, weighted=True,
weights=None, suffix='_sum', **kwargs):
"""
Compute the weighted average of a selection of events columns based on
the overlap of the input bounds with respect to route events.
Parameters
----------
beg : float
Beginning milepost of the overlaid segment.
end : float
Ending milepost of the overlaid segment.
cols : list
List of column labels to aggregate.
weighted : boolean, default True
Whether the computed sum should be weighted. If False, an
un-weighted sum will be computed, giving all intersecting values an
equal weight.
weights : np.ndarray
An array of length-normalized overlay weights; if excluded,
weights will be computed based on given mileposts and parameters;
if multiple overlay computations are being conducted, computing
the weights separately and then inputting them directly into the
aggregation functions will produce time savings.
"""
# Validate inputs
cols = self._validate_cols(cols=cols, require=True)
# Compute weights
if weights is None and weighted:
weights = self.overlay(beg, end, normalize=False, **kwargs).values
weights = np.divide(weights, self.lengths)
elif weights is None and not weighted:
weights = self.is_intersecting(beg, end) * 1
# Aggregate selected columns
res = []
for col in cols:
vals = self._df[col].values
if len(vals) == 0:
sum_ = np.nan
# If weights are available, calculate sum
elif len(weights) > 0 and weights.sum() > 0:
sum_ = (vals * weights).sum()
# If weights are not available, assume zero
else:
sum_ = 0
# Log computed sums
res.append(sum_)
# Process results
if len(cols) == 1:
return res[0]
else:
return pd.Series(data=res, index=[str(col)+suffix for col in cols])
[docs] def overlay_most(self, beg=None, end=None, cols=None, weights=None,
suffix='_most', **kwargs):
"""
Compute the most represented values of a selection of events columns
based on the overlap of the input bounds with respect to route events.
Parameters
----------
beg : float
Beginning milepost of the overlaid segment.
end : float
Ending milepost of the overlaid segment.
cols : list
List of column labels to aggregate.
weights : pd.Series
A series of length-normalized overlay weights; if excluded,
weights will be computed based on given mileposts and parameters;
if multiple overlay computations are being conducted, computing
the weights separately and then inputting them directly into the
aggregation functions will produce time savings.
"""
# Validate inputs
cols = self._validate_cols(cols=cols, require=True)
# Validate group shape
if self.shape[0] == 0:
if len(cols) == 1:
return np.nan
else:
return pd.Series(data=np.nan,
index=[str(col) + suffix for col in cols])
# Compute weights
if weights is None:
weights = self.overlay(beg, end, normalize=True, how='right')
# Aggregate selected columns
res = []
for col in cols:
vals = self.df[col].values
unique = np.unique(vals)
scores = []
# Score each unique value based on associated weights
for val in unique:
scores.append(np.where(vals==val, weights, 0).sum())
res.append(unique[np.argmax(scores)])
# Process results
if len(cols) == 1:
return res[0]
else:
return pd.Series(data=res, index=[str(col)+suffix for col in cols])
[docs]class EventsCollection(EventsFrame):
"""
User-level class for managing linear and points events data. This class is
used for complex data sets with multiple groups of events, grouped by at
least one key column (e.g., route ID). Data is managed using both the
pandas tabular data package as well as the ranges range data package.
EventsCollection class instances represent complex events data sets with
multiple groups of events which are distinguished by at least one set of
keys (e.g., years of data or inventory categories). These collections can
be used for a variety of linear referencing operations and events
manipulations, such as dissolves based on a subset of events columns,
returning a simplified data set with a selection of columns aggregated.
Additionally, these collections can be used to perform automated
intersections with another EventsCollection class instance using the
retrieve() method, retrieving column data from another collection and
relating it to the original collection's events data.
EventsCollection class instances can be queried using the get_subset() and
get_group() methods, returning a pared down EventsCollection or a specific
EventsGroup, respectively. Similarly, this can be done using object
indexing, passing a mixture of unique values and valid slices of unique
key values to return a subset of the collection as an EventsCollection
instance, or just unique key values to return a unique group as an
EventsGroup instance.
Parameters
----------
df : pd.DataFrame
Pandas dataframe which contains linear or point events data.
keys : list or tuple
A list or tuple of dataframe column labels which define the unique
groups of events within the events dataframe. Common examples include
year or route ID columns which distinguish unrelated sets of events
within the events dataframe.
beg, end : str or label
Column labels within the events dataframe which represent the linearly
referenced location of each event. For linear events both are required,
defining the begin and end location of each event. For point events,
only 'beg' is required, defining the exact location of each event (the
'end' property will automatically be set to be equal to the 'beg'
property).
geom : str or label, optional
Column label within the events dataframe which represents the shapely
geometry associated with each event if available. If provided,
certain additional class functionalities will be made available.
closed : str {'left', 'left_mod', 'right', 'right_mod', 'both',
'neither'}, optional
Whether intervals are closed on the left-side, right-side, both or
neither. If None, will default to 'left_mod' for linear events and
'both' for point events.
Options
-------
left : ranges are always closed on the left and never closed on the
right.
left_mod : ranges are always closed on the left and only closed on the
right when the next range is not consecutive.
right : ranges are always closed on the right and never closed on the
right.
right_mod : ranges are always closed on the right and only closed on
the left when the previous range is not consecutive.
both : ranges are always closed on both sides
neither : ranges are never closed on either side
sort : bool, default False
Whether to sort the events dataframe by its keys and begin and end
values upon its creation.
missing_data : {'ignore','drop','warn','raise'}, default 'warn'
What to do when the input dataframe contains missing values in the
target key, beg, and end columns.
Options
-------
ignore : do nothing.
drop : drop all records which contain any missing data in the target
columns.
warn : log a warning when records are missing data.
raise : raise a ValueError when records are missing data.
"""
def __init__(self, df, keys=None, beg=None, end=None,
geom=None, closed=None, sort=False, missing_data='warn', **kwargs):
# Validate keys option
if keys is None:
raise Exception("If no keys are required to define unique groups "
"of events, please use the EventsGroup class instead of the "
"EventsCollection class.")
# Initialize EventsFrame superclass
super(EventsCollection, self).__init__(
df=df, keys=keys, beg=beg, end=end, geom=geom, sort=sort, **kwargs)
# Log input values
self.closed = closed
# Additional processing
self._check_missing_data(missing_data=missing_data)
# Create events log
self.log = EventsLog()
def __getitem__(self, keys):
# Determine type of retrieval - single group or filter slice
if isinstance(keys, tuple):
if any(isinstance(key, slice) for key in keys):
# Partial slice
return self.get_subset(keys)
else:
# Single group
return self.get_group(keys, empty=False)
else:
if isinstance(keys, slice):
# Partial slice
return self.get_subset(keys)
else:
# Single group
return self.get_group(keys, empty=False)
def _initialize_df(self):
"""
Class-specific dataframe initialization processes.
"""
self._empty_df = pd.DataFrame(columns=self.columns)
self._empty_group = self._build_group(self._empty_df.copy())
def _check_missing_data(self, missing_data='warn'):
"""
Check for missing data in keys, beg, end, and geometry fields. Warn
user when target fields contain null data.
"""
# If ignore
if missing_data=='ignore':
return
elif missing_data in ['warn','raise','drop']:
# Find, count missing data records
mask = self.df[self.targets].isna().any(axis=1)
count = mask.sum()
# Address if more than one records contain missing data
if count > 0:
# Drop records
if missing_data=='drop':
self.df = self.df[~mask].copy()
return
# Warn or raise error
else:
# Prepare message
message = (
f"Input events dataframe has {count:,.0f} records "
"with missing data in target columns. This may cause "
"unexpected behaviors.")
if missing_data=='raise':
raise ValueError(message)
else:
warnings.warn(message)
return
else:
raise ValueError(
"Invalid input missing_data parameter. Must be one of "
"('ignore','drop','warn','raise').")
[docs] def from_similar(self, df, **kwargs):
"""
Create an EventsCollection from the input dataframe, assuming the same
column labels and closed parameter as the calling collection.
Additional constructor keyword arguments can be passed through
**kwargs.
Parameters
----------
df : pd.DataFrame
Pandas dataframe which contains linear or point events data,
formatted with standard labels. If multiple keys are detected, they
will be assigned in the order in which they appear within the
target dataframe. Only one of each begin and end option may be
used. The geometry label is optional.
**kwargs
Additional keyword arguments to be passed to the EventsCollection
constructor.
"""
# Build the events collection
kwargs = {**dict(
keys=self.keys,
beg=self.beg,
end=self.end,
geom=self.geom,
closed=self.closed,
), **kwargs}
ec = self.__class__(df, **kwargs)
return ec
[docs] @classmethod
def from_standard(cls, df, require_end=False, **kwargs):
"""
Create an EventsCollection from the input dataframe assuming standard
column labels. These standard labels can be modified on the class
directly be modifying the associated class attributes:
- default_keys
- default_beg
- default_end
- default_geom
Standard labels include:
keys : 'RID', 'YEAR', 'KEY'
beg : 'BMP', 'BEG', 'FROM'
end : 'EMP', 'END', 'TO'
geom : 'geometry'
Additional constructor keyword arguments can be passed through
**kwargs.
Parameters
----------
df : pd.DataFrame
Pandas dataframe which contains linear or point events data,
formatted with standard labels. If multiple keys are detected, they
will be assigned in the order in which they appear within the
target dataframe. Only one of each begin and end option may be
used. The geometry label is optional.
require_end : bool, default False
Whether to raise an error if no valid unique end column label is
found. If False, no end label will be used when generating the
collection.
**kwargs
Additional keyword arguments to be passed to the EventsCollection
constructor.
"""
# Check for standard label assignments
keys, beg, end, geom = [], None, None, None
for col in df.columns:
# Check for key labels
if col in cls.default_keys:
keys.append(col)
# Check for other labels
if col in cls.default_beg:
if not beg is None:
raise ValueError("There is more than one standard label "
"in the provided dataframe for the 'beg' parameter."
f"Standard labels include {cls.default_beg}.")
beg = col
if col in cls.default_end:
if not end is None:
raise ValueError("There is more than one standard label "
"in the provided dataframe for the 'end' parameter."
f"Standard labels include {cls.default_end}.")
end = col
if col in cls.default_geom:
if not geom is None:
raise ValueError("There is more than one standard label "
"in the provided dataframe for the 'geom' parameter."
f"Standard labels include {cls.default_geom}.")
geom = col
# Check for missing labels
if beg is None:
raise ValueError("No standard label provided for the 'beg' "
f"parameter. Standard labels include {cls.default_beg}.")
if end is None and require_end:
raise ValueError("No standard label provided for the 'end' "
f"parameter. Standard labels include {cls.default_end}.")
# Build the events collection
ec = cls(df, keys=keys, beg=beg, end=end, geom=geom, **kwargs)
return ec
@property
def log(self):
return self._log
@log.setter
def log(self, obj):
if not isinstance(obj, EventsLog):
raise TypeError("Log must be EventsLog class instance.")
self._log = obj
def _build_empty(self):
return self._empty_group.copy(deep=True)
[docs] def reset_log(self):
"""
Reset the log of built events groups.
"""
# Reset log
self._log = {}
def _validate_keys(self, keys):
"""
Validate the input list or tuple of keys to determine if it is a valid
query for the collection's collection dictionary.
"""
# Validate input keys
if self.num_keys == 0:
if not keys is None:
raise ValueError(
"No keys defined in the collection to be queried.")
elif self.num_keys == 1:
if isinstance(keys, list) or isinstance(keys, tuple):
keys = keys[0]
elif self.num_keys > 1:
if not isinstance(keys, list) and not isinstance(keys, tuple):
raise TypeError("Input keys information must be provided as a "
"list or tuple with a length equal to self.num_keys.")
elif len(keys) != self.num_keys:
raise ValueError("Must provide a number of keys which is "
"equal to the number of keys defined in the collection "
f"({self.num_keys} required, {len(keys)} provided).")
keys = tuple(keys)
# Return validated keys
return keys
[docs] def overlay_average(self, other, cols=None, **kwargs):
"""
"""
# Validate input
# - Input events
if not isinstance(other, self.__class__):
raise TypeError(f"Input 'other' must be {self.__class__.__name__} "
"type.")
# - Same number of keys
if not self.num_keys == other.num_keys:
raise ValueError("Other collection must have the same number of "
"keys as the target collection.")
# - Input retrieval columns
cols = other._validate_cols(cols)
if len(cols) == 0:
raise ValueError("At least one retrieve column must be provided.")
# Prepare for retrieval
def _apply_retrieve(r):
try:
# Retrieve corresponding events group
group_key = tuple(r[loc] for loc in self.key_locs)
eg = other.get_group(group_key, empty=False)
# Overlay with record bounds
res = eg.overlay_average(r[self.beg_loc], r[self.end_loc],
cols=cols, **kwargs)
# Retrieve requested column data
if not res is list:
res = [res]
except KeyError:
res = [np.nan for loc in locs]
return res
# Get positional indexes of requested columns
locs = [other.columns.index(col) for col in cols]
# Perform overlays
res = [_apply_retrieve(r) for r in self.df.values]
res = pd.DataFrame(res, columns=cols, index=self.df.index)
# Return retrieved column data
return res
[docs] def merge(self, other):
"""
Create an EventsMerge instance with this collection as the left and the
other collection as the right. This can then be used to retrieve
attributes from the other collection to be appended to this
collection's dataframe.
Parameters
----------
other : EventsCollection
Another events collection with similar keys which will be merged
with this events collection, producing an EventsMerge instance
which can be used to perform various overlay operations to retrieve
attributes and more from the target collection.
"""
# Create merge
em = EventsMerge(self, other)
return em
[docs] def project_parallel(self, other, samples=3, buffer=100, match='all',
choose=1, sort_locs=True, **kwargs):
"""
Project an input geodataframe of linear geometries onto parallel events
in the events dataframe, producing linearly referenced locations for all
input geometries which are found to be parallel based on buffer and
sampling parameters.
Parameters
----------
other : gpd.GeoDataFrame
Geodataframe containing linear geometry which will be projected
onto the events dataframe.
samples : int, default 3
The number of equidistant sample points to take along each geometry
being projected to check for nearby geometry.
buffer : float, default 100
The max distance to search for input geometries to project against
the events' geometries. Measured in terms of the geometries'
coordinate reference system.
match : {'all', int}, default 'all'
How many sample points must find a nearby target event to produce a
positive match to that event, resulting in a projection.
choose : {int, 'all'}, default 1
How many target geometries to choose when more than one match
occurs.
sort_locs : bool, default True
Whether begin and end location values should be sorted, ensuring
that all events are increasing and monotonic.
**kwargs
Keyword arguments to be passed to the EventsCollection constructor
upon completion of the projection.
"""
# Create projector
pp = ParallelProjector(self, other, samples=samples, buffer=buffer)
# Perform match and return results in new events collection
return EventsCollection(
pp.match(match=match, choose=choose, sort_locs=sort_locs),
keys=self.keys,
beg=self.beg,
end=self.end,
closed=self.closed,
missing_data='ignore',
**kwargs
)
[docs] def get_group(self, keys, empty=True, log_empty=True,
**kwargs) -> EventsGroup:
"""
Retrieve a unique group of events based on provided key values.
Parameters
----------
keys : key value, tuple of key values, or list of the same
If only one key column is defined within the collection, a single
column value may be provided. Otherwise, a tuple of column values
must be provided in the same order as they appear in self.keys.
empty : bool, default True
Whether to allow for empty events groups to be returned when the
provided keys are valid but are not associated with any actual
events. If False, these cases will return a KeyError.
log_empty : bool, default True
Whether created empty events should be logged and stored within
the collection to allow for quicker access. More memory intensive
but may produce moderate performance improvements if empty keys
will be accessed repeatedly.
"""
# Attempt to retrieve from log
keys = self._validate_keys(keys)
try:
# Retrieve from log
group = self.log[keys]
except KeyError:
# Attempt to retrieve dataframe to create new group
try:
# Build and add group to log
group = self._build_group(self._groups.get_group(keys))
self.log[keys] = group
# Invalid group keys (i.e., empty group)
except KeyError:
# Deal with empty group
if empty:
group = self._build_empty()
if log_empty:
self.log[keys] = group
else:
raise KeyError(
f"Invalid EventsCollection keys: {keys}")
# Collection is None (i.e., no defined keys)
except AttributeError:
raise ValueError("No defined group keys.")
return group
[docs] def get_subset(self, keys, reduce=True, **kwargs):
"""
Retrieve a subset of the events collection based on the provided key
values or slices. Returned events must satisfy all keys.
Parameters
----------
keys : list or tuple of slice, list, or other
A list of either (1) slices which can be used to slice the key
values present in self.key_values for the associated key, (2) a
list of values which reflect those in self.key_values, or (3) a
single value which is present in self.key_values. Inputs must be
provided in the same order as they appear in self.keys.
reduce : bool, default True
Whether to simplify the resulting EventsCollection by removing any
keys which are queried for a single value and become obsolete.
For example, if one key represents years of data and a single year
is provided, that key will be removed from the resulting collection
as it can no longer be queried further.
"""
# Determine filtering
keys = self._validate_keys(keys)
key_values = self.key_values
mask = pd.Series(data=True, index=self.df.index)
new_keys = []
for key, val in zip(self.keys, keys):
# Determine input type and perform filter
try:
if isinstance(val, slice):
new_keys.append(key)
mask &= self.df[key].isin(key_values[key][val])
elif isinstance(val, (list, np.ndarray)):
new_keys.append(key)
mask &= self.df[key].isin(val)
else:
if not reduce:
new_keys.append(key)
mask &= self.df[key] == val
except:
raise ValueError(f"Unable to filter key '{key}' based on "
f"provided input value {val}.")
# Produce filtered collection
df = self.df.loc[mask, :]
try:
ec = EventsCollection(
df, keys=new_keys, beg=self.beg, end=self.end,
geom=self.geom, closed=self.closed, missing_data='ignore')
except:
raise ValueError(
"Unable to produce EventsCollection subset due to unknown "
"error.")
return ec
[docs] def get_matching(self, other, **kwargs):
"""
Retrieve a subset of the events collection based on the unique group
values present in another provided events collection.
Parameters
----------
other : EventsCollection
Another events collection with matching keys which will be used to
select a subset of this events collection based on its key values.
"""
# Get subset of groups
return self.get_group(other.group_keys_unique, empty=True)
def _build_group(self, df):
"""
Build a group based on the input dataframe which should be a subset of
the events collection's dataframe.
"""
# Build and return events group
try:
return EventsGroup(
df=df, beg=self.beg, end=self.end, geom=self.geom,
closed=self.closed)
except Exception as e:
display(df)
raise e
####################
# COMMON FUNCTIONS #
####################
[docs]def from_standard(df, require_end=False, **kwargs):
"""
Create an EventsCollection from the input dataframe assuming standard
column labels. These standard labels can be modified on the class
directly be modifying the associated class attributes:
- default_keys
- default_beg
- default_end
- default_geom
Standard labels include:
keys : 'RID', 'YEAR', 'KEY'
beg : 'BMP', 'BEG', 'FROM'
end : 'EMP', 'END', 'TO'
geom : 'geometry'
Additional constructor keyword arguments can be passed through
**kwargs.
Parameters
----------
df : pd.DataFrame
Pandas dataframe which contains linear or point events data,
formatted with standard labels. If multiple keys are detected, they
will be assigned in the order in which they appear within the
target dataframe. Only one of each begin and end option may be
used. The geometry label is optional.
require_end : bool, default False
Whether to raise an error if no valid unique end column label is
found. If False, no end label will be used when generating the
collection.
**kwargs
Additional keyword arguments to be passed to the EventsCollection
constructor.
"""
ec = EventsCollection.from_standard(df, require_end=require_end, **kwargs)
return ec
[docs]def check_compatibility(objs, errors='raise', **kwargs):
"""
Check if the input list of EventsCollections are all compatible for
merging, unifying, or similar relational processes. Errors will be raised
if objects are not found to be compatible with information about why they
are not compatible. If requested, errors can be ignored, returning False
instead. If all objects are compatible, the function will return True.
Parameters
----------
objs : list-like of EventsCollections
List of EventsCollection objects to be tested against each other.
errors : {'raise','ignore'}
How to respond to errors when they arise.
"""
# Ensure minimum objects provided
try:
assert len(objs) > 0
except AssertionError:
raise ValueError("Must provide at least one object for testing.")
try:
# Ensure type
try:
assert all(isinstance(obj, EventsCollection) for obj in objs)
except AssertionError:
raise TypeError("All input objects must be EventsCollections.")
# Ensure matching keys
try:
num_keys = objs[0].num_keys
for obj in objs[1:]:
assert obj.num_keys == num_keys
except AssertionError:
raise ValueError(
"All input objects must have the same number of keys.")
except Exception as e:
if errors == 'raise':
raise e
else:
return False
return True
#####################
# LATE DEPENDENCIES #
#####################
from linref.events.merge import EventsMerge, EventsMergeAttribute
from linref.events.spatial import ParallelProjector