Skip to content

ENH: merge_asof() has type specializations and can take multiple 'by' parameters (#13936) #14783

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
13 changes: 13 additions & 0 deletions asv_bench/benchmarks/join_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,19 @@ def setup(self):
self.df1 = self.df1.sort_values('time')
self.df2 = self.df2.sort_values('time')

self.df1['time32'] = np.int32(self.df1.time)
self.df2['time32'] = np.int32(self.df2.time)

self.df1a = self.df1[['time', 'value1']]
self.df2a = self.df2[['time', 'value2']]
self.df1b = self.df1[['time', 'key', 'value1']]
self.df2b = self.df2[['time', 'key', 'value2']]
self.df1c = self.df1[['time', 'key2', 'value1']]
self.df2c = self.df2[['time', 'key2', 'value2']]
self.df1d = self.df1[['time32', 'value1']]
self.df2d = self.df2[['time32', 'value2']]
self.df1e = self.df1[['time', 'key', 'key2', 'value1']]
self.df2e = self.df2[['time', 'key', 'key2', 'value2']]

def time_noby(self):
merge_asof(self.df1a, self.df2a, on='time')
Expand All @@ -318,6 +325,12 @@ def time_by_object(self):
def time_by_int(self):
merge_asof(self.df1c, self.df2c, on='time', by='key2')

def time_on_int32(self):
merge_asof(self.df1d, self.df2d, on='time32')

def time_multiby(self):
merge_asof(self.df1e, self.df2e, on='time', by=['key', 'key2'])


#----------------------------------------------------------------------
# data alignment
Expand Down
10 changes: 10 additions & 0 deletions doc/source/whatsnew/v0.19.2.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,13 @@ Bug Fixes
- Explicit check in ``to_stata`` and ``StataWriter`` for out-of-range values when writing doubles (:issue:`14618`)

- Bug in ``unstack()`` if called with a list of column(s) as an argument, regardless of the dtypes of all columns, they get coerced to ``object`` (:issue:`11847`)


.. _whatsnew_0192.enhancements.other:

Other enhancements
^^^^^^^^^^^^^^^^^^

- ``pd.merge_asof()`` can take multiple columns in ``by`` parameter and has specialized dtypes for better performace (:issue:`13936`)


27 changes: 25 additions & 2 deletions pandas/src/joins_func_helper.pxi.in
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# cython: boundscheck=False, wraparound=False
"""
Template for each `dtype` helper function for hashtable

Expand All @@ -14,7 +15,9 @@ WARNING: DO NOT edit .pxi FILE directly, .pxi is generated from .pxi.in
by_dtypes = [('PyObjectHashTable', 'object'), ('Int64HashTable', 'int64_t')]

# on_dtype
on_dtypes = ['int64_t', 'double']
on_dtypes = ['uint8_t', 'uint16_t', 'uint32_t', 'uint64_t',
'int8_t', 'int16_t', 'int32_t', 'int64_t',
'float', 'double']

}}

Expand Down Expand Up @@ -98,7 +101,9 @@ def asof_join_{{on_dtype}}_by_{{by_dtype}}(ndarray[{{on_dtype}}] left_values,
{{py:

# on_dtype
dtypes = ['int64_t', 'double']
dtypes = ['uint8_t', 'uint16_t', 'uint32_t', 'uint64_t',
'int8_t', 'int16_t', 'int32_t', 'int64_t',
'float', 'double']

}}

Expand Down Expand Up @@ -158,3 +163,21 @@ def asof_join_{{on_dtype}}(ndarray[{{on_dtype}}] left_values,

{{endfor}}


#----------------------------------------------------------------------
# stringify
#----------------------------------------------------------------------

def stringify(ndarray[object, ndim=2] xt):
cdef:
Py_ssize_t n
ndarray[object] result

n = len(xt)
result = np.empty(n, dtype=np.object)

for i in range(n):
result[i] = xt[i].tostring()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need this again? (I see you are using it), but what is the input that you are giving it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a doc-string would help

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a couple comments to address this. When the by parameter has multiple entries, then we want to store the entire array in the hash table. Unfortunately, NumPy arrays aren't hashable. After lots of digging, the fastest thing to do is to convert to a string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


return result

89 changes: 52 additions & 37 deletions pandas/tools/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
is_list_like,
_ensure_int64,
_ensure_float64,
_ensure_object)
_ensure_object,
_get_dtype)
from pandas.types.missing import na_value_for_dtype

from pandas.core.generic import NDFrame
Expand Down Expand Up @@ -270,8 +271,8 @@ def merge_asof(left, right, on=None,
DataFrame whose 'on' key is less than or equal to the left's key. Both
DataFrames must be sorted by the key.

Optionally perform group-wise merge. This searches for the nearest match
on the 'on' key within the same group according to 'by'.
Optionally match on equivalent keys with 'by' before searching for nearest
match with 'on'.

.. versionadded:: 0.19.0

Expand All @@ -288,9 +289,8 @@ def merge_asof(left, right, on=None,
Field name to join on in left DataFrame.
right_on : label
Field name to join on in right DataFrame.
by : column name
Group both the left and right DataFrames by the group column; perform
the merge operation on these pieces and recombine.
by : column name or list of column names
Match on these columns before performing merge operation.
suffixes : 2-length sequence (tuple, list, ...)
Suffix to apply to overlapping column names in the left and right
side, respectively
Expand Down Expand Up @@ -926,27 +926,44 @@ def get_result(self):
return result


_asof_functions = {
'int64_t': _join.asof_join_int64_t,
'double': _join.asof_join_double,
}
def _asof_function(on_type):
return getattr(_join, 'asof_join_%s' % on_type, None)


def _asof_by_function(on_type, by_type):
return getattr(_join, 'asof_join_%s_by_%s' % (on_type, by_type), None)

_asof_by_functions = {
('int64_t', 'int64_t'): _join.asof_join_int64_t_by_int64_t,
('double', 'int64_t'): _join.asof_join_double_by_int64_t,
('int64_t', 'object'): _join.asof_join_int64_t_by_object,
('double', 'object'): _join.asof_join_double_by_object,
}

_type_casters = {
'int64_t': _ensure_int64,
'double': _ensure_float64,
'object': _ensure_object,
}

_cyton_types = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_cython_types ?

'uint8': 'uint8_t',
'uint32': 'uint32_t',
'uint16': 'uint16_t',
'uint64': 'uint64_t',
'int8': 'int8_t',
'int32': 'int32_t',
'int16': 'int16_t',
'int64': 'int64_t',
'float16': 'float',
'float32': 'float',
'float64': 'double',
}


def _get_cython_type(dtype):
""" Given a dtype, return 'int64_t', 'double', or 'object' """
""" Given a dtype, return a C name like 'int64_t' or 'double' """
type_name = _get_dtype(dtype).name
ctype = _cyton_types.get(type_name, 'object')
return ctype


def _get_cython_type_upcast(dtype):
""" Upcast a dtype to 'int64_t', 'double', or 'object' """
if is_integer_dtype(dtype):
return 'int64_t'
elif is_float_dtype(dtype):
Expand Down Expand Up @@ -990,9 +1007,6 @@ def _validate_specification(self):
if not is_list_like(self.by):
self.by = [self.by]

if len(self.by) != 1:
raise MergeError("can only asof by a single key")

self.left_on = self.by + list(self.left_on)
self.right_on = self.by + list(self.right_on)

Expand Down Expand Up @@ -1046,6 +1060,11 @@ def _get_merge_keys(self):
def _get_join_indexers(self):
""" return the join indexers """

def flip_stringify(xs):
""" flip an array of arrays and string-ify contents """
xt = np.transpose(xs)
return _join.stringify(_ensure_object(xt))

# values to compare
left_values = self.left_join_keys[-1]
right_values = self.right_join_keys[-1]
Expand All @@ -1067,22 +1086,23 @@ def _get_join_indexers(self):

# a "by" parameter requires special handling
if self.by is not None:
left_by_values = self.left_join_keys[0]
right_by_values = self.right_join_keys[0]

# choose appropriate function by type
on_type = _get_cython_type(left_values.dtype)
by_type = _get_cython_type(left_by_values.dtype)
if len(self.left_join_keys) > 2:
# get string representation of values if more than one
left_by_values = flip_stringify(self.left_join_keys[0:-1])
right_by_values = flip_stringify(self.right_join_keys[0:-1])
else:
left_by_values = self.left_join_keys[0]
right_by_values = self.right_join_keys[0]

on_type_caster = _type_casters[on_type]
# upcast 'by' parameter because HashTable is limited
by_type = _get_cython_type_upcast(left_by_values.dtype)
by_type_caster = _type_casters[by_type]
func = _asof_by_functions[(on_type, by_type)]

left_values = on_type_caster(left_values)
right_values = on_type_caster(right_values)
left_by_values = by_type_caster(left_by_values)
right_by_values = by_type_caster(right_by_values)

# choose appropriate function by type
on_type = _get_cython_type(left_values.dtype)
func = _asof_by_function(on_type, by_type)
return func(left_values,
right_values,
left_by_values,
Expand All @@ -1092,12 +1112,7 @@ def _get_join_indexers(self):
else:
# choose appropriate function by type
on_type = _get_cython_type(left_values.dtype)
type_caster = _type_casters[on_type]
func = _asof_functions[on_type]

left_values = type_caster(left_values)
right_values = type_caster(right_values)

func = _asof_function(on_type)
return func(left_values,
right_values,
self.allow_exact_matches,
Expand Down
Loading