dupegrouper

A Python library for grouping duplicate data efficiently.

PyPI Version PyPI - Python Version

Introduction

dupegrouper can be used for various deduplication use cases. It's intended purpose is to implement a uniform API that allows for both exact and near deduplication — whilst also offering record selection, based on the first instance of a set of duplicates i.e. a "group".

Deduplicating data is a hard task — validating approaches takes time, can require a lot of testing, validating, and iterating through approaches that may, or may not, be applicable to your dataset.

dupegrouper abstracts away the task of actually deduplicating, so that you can focus on the most important thing: implementing an appropriate "strategy" to achieve your stated end goal ...

...In fact a "strategy" is key to dupegrouper's API. dupegrouper has:

Ready-to-use deduplication strategies

dupegrouper currently offers the following deduplication strategies:

string type numeric type
Exact string Jaccard*
Fuzzy matching Cosine similarity*
TfIdf -
LSH* -

* due for implementation in a future version

You can also implement custom deduplication logic, which dupegrouper can readily accept, as descripted in Custom Strategies.

Multiple backend support

dupegrouper aims to scale in line with your problem. The following backends are currently support:

  • Pandas
  • Polars
  • PySpark

A flexible API

Checkout the API Documentation

Installation

pip install dupegrouper

Example

import dupegrouper

dg = dupegrouper.DupeGrouper(df) # input dataframe

dg.add_strategy(dupegrouper.strategies.Exact())

dg.dedupe("address")

dg.df # retrieve dataframe

Usage Guide

Adding Strategies

dupegrouper comes with ready-to-use deduplication strategies:

Strategies can be added one-by-one and are executed in the order in which they are added. In the below case the, the address column will firstly be deduplicted exactly, and then using Fuzzy matching.

# Deduplicate the address column

dg = dupegrouper.DupeGrouper(df)

dg.add_strategy(dupegrouper.strategies.Exact())
dg.add_strategy(dupegrouper.strategies.Fuzzy(tolerance=0.3))

dg.dedupe("address")

Or, you can add a map of strategies. In this case, strategies are executed in their defined order, for each map key. The below implementation will produce the same as above.

# Also deduplicates the address column

dg = dupegrouper.DupeGrouper(df)

dg.add_strategy({
    "address": [
        dupegrouper.strategies.Exact(),
        dupegrouper.strategies.Fuzzy(tolerance=0.3),
    ]
})

print(dg.strategies)
# {'address': ('Exact', 'Fuzzy', 'TfIdf')}

dg.dedupe() # No Argument!

A call of dedupe() will reset the strategies:

...
print(dg.strategies)
# {'address': ('Exact', 'Fuzzy', 'TfIdf')}
dg.dedupe()
print(dg.strategies)
# None

Custom Strategies

Maybe you need some custom deduplication methodology. An instance of dupegrouper.DupeGrouper can accept custom functions too.

def my_func(df, attr: str, /, **kwargs) -> dict[str, str]:
    my_map = {}
    for row in df:
        # e.g. use **kwargs
        my_map = ...
    return my_map

Above, my_func is a (very) boilerplate custom deduplication implementation:

  • it accepts a dataframe (df)
  • it will deduplicate on a specific attribute (attr)
  • And accepts other keyword arguments specific to your problem (**kwargs)

Look closely at the function signature — your function needs to implement this exactly. Additionally it produces a map where a key-value pair represents a deduplication match where the value is the new selected record ("group").

Warning

In the current implementation, there is no guarantee that a generator can be used to yield deduplicated value maps

You can proceed to add your custom function as a strategy:

dg = dupegrouper.DupeGrouper(df)

dg.add_strategy((my_func, {"match_str": "london"}))

print(dg.strategies) # returns ("my_func",)

dg.dedupe("address")
Warning

In the current implementation, any custom callable will also always dedupe exact matches!

Creating a Comprehensive Strategy

You can use the above techniques for a comprehensive strategy to deduplicate your data:

import dupegrouper
import pandas # | polars | pyspark

df = pd.read_csv("example.csv")

dg = dupegrouper.DupeGrouper(df)

strategies = {
    "address": [
        dupegrouper.strategies.Exact(),
        dupegrouper.strategies.Fuzzy(tolerance=0.5),
        (my_func, {"match": "london"}), # any address that contains "london"
    ],
    "email": [
        dupegrouper.strategies.Exact(),
        dupegrouper.strategies.Fuzzy(tolerance=0.3),
        dupegrouper.strategies.TfIdf(tolerance=0.4, ngram=3, topn=2),
    ],
}

dg.add_strategy(strategies)

dg.dedupe()

df = dg.df

Using the PySpark backend

dupegrouper can be used as described in Creating a Comprehensive Strategy. dupegrouper is partition-aware, and will deduplicate each partition, per worker node, given the defined strategy. Such a distributed implementation puts the onus on appropriate planning:

  • partitions must already be containers of expected duplicates
  • partitioning (or re-partitioning) must be planned ahead of time

The above problem is typically dealt with the use of a "blocking key" which is the partitioning/repartitioning key. Whilst several approaches may be valid, a blocking key be typically be computed as a general property of several records that are expected to contain duplicates. As an example, that might be the first N characters of a given attribute needing deduplicating.

Extending the API for Custom Implementations

It's recommended that for simple custom implementations you use the approach discussed for custom functions. (see Custom Strategies).

However, you can derive directly from the abstract base class dupegrouper.strategy.DeduplicationStrategy, and thus make direct use of the efficient, core deduplication methods implemented in this library, as described in it's API. This will expose a dedupe() method, ready for direct use within an instance of DupeGrouper, much the same way that other dupegrouper.strategies are passed in as strategies.

About

License

This project is licensed under the Apache-2.0 License. See the LICENSE file for more details.

 1"""
 2.. include:: ../README.md
 3"""
 4
 5from dupegrouper.base import DupeGrouper
 6from dupegrouper import strategy, strategies  # submodules
 7
 8
 9__all__ = [
10    "DupeGrouper",
11    "strategy",
12    "strategies",
13]
class DupeGrouper:
 55class DupeGrouper:
 56    """Top-level entrypoint for grouping duplicates
 57
 58    This class handles initialisation of a dataframe, dispatching appropriately
 59    given the supported dataframe libraries (e.g. Pandas). An instance of this
 60    class can then accept a variety of strategies for deduplication and
 61    grouping.
 62
 63    Upon initialisation, `DupeGrouper` sets a new column, usually `"group_id"`
 64    — but you can control this by setting an environment variable `GROUP_ID` at
 65    runtime. The group_id is a monotonically increasing, numeric id column
 66    starting at 1 to the length of the dataframe provided.
 67    """
 68
 69    def __init__(
 70        self,
 71        df: DataFrameLike,
 72        spark_session: SparkSession | None = None,
 73        id: str | None = None,
 74    ):
 75        self._df: WrappedDataFrame = _wrap(df, id)
 76        self._strategy_manager = _StrategyManager()
 77        self._spark_session = spark_session
 78        self._id = id
 79
 80    @singledispatchmethod
 81    def _call_strategy_deduper(
 82        self,
 83        strategy: DeduplicationStrategy | tuple[typing.Callable, typing.Any],
 84        attr: str,
 85    ):
 86        """Dispatch the appropriate strategy deduplication method.
 87
 88        If the strategy is an instance of a dupegrouper `DeduplicationStrategy`
 89        the strategy will have been added as such, with it's parameters. In the
 90        case of a custom implementation of a Callable, passed as a tuple, we
 91        pass this *directly* to the `Custom` class and initialise that.
 92
 93        Args:
 94            strategy: A `dupegrouper` deduplication strategy or a tuple
 95                containing a (customer) callable and its parameters.
 96            attr: The attribute used for deduplication.
 97
 98        Returns:
 99            A deduplicated dataframe
100
101        Raises:
102            NotImplementedError.
103        """
104        del attr  # Unused
105
106        raise NotImplementedError(f"Unsupported strategy: {type(strategy)}")
107
108    @_call_strategy_deduper.register(DeduplicationStrategy)
109    def _(self, strategy, attr) -> WrappedDataFrame:
110        return strategy.with_frame(self._df).dedupe(attr)
111
112    @_call_strategy_deduper.register(tuple)
113    def _(self, strategy: tuple[typing.Callable, typing.Any], attr) -> WrappedDataFrame:
114        func, kwargs = strategy
115        return Custom(func, attr, **kwargs).with_frame(self._df).dedupe()
116
117    @singledispatchmethod
118    def _dedupe(
119        self,
120        attr: str | None,
121        strategies: StrategyMapCollection,
122    ):
123        """Dispatch the appropriate deduplication logic.
124
125        If strategies have been added individually, they are stored under a
126        "default" key and retrived as such when the public `.dedupe` method is
127        called _with_ the attribute label. In the case of having added
128        strategies in one go with a direct dict (mapping) object, the attribute
129        label is first extracted from strategy collection dictionary keys.
130        Upon completing deduplication the strategy collection is wiped for
131        (any) subsequent deduplication.
132
133        Args:
134            attr: The attribute used for deduplication; or None in the case
135                of strategies being a mapping object
136
137        Returns:
138            None; internal `_df` attribute is updated.
139
140        Raises:
141            NotImplementedError.
142        """
143        del strategies  # Unused
144        raise NotImplementedError(f"Unsupported attribute type: {type(attr)}")
145
146    @_dedupe.register(str)
147    def _(self, attr, strategies):
148        for strategy in strategies["default"]:
149            self._df = self._call_strategy_deduper(strategy, attr)
150
151    @_dedupe.register(NoneType)
152    def _(self, attr, strategies):
153        del attr  # Unused
154        for attr, strategies in strategies.items():
155            for strategy in strategies:
156                self._df = self._call_strategy_deduper(strategy, attr)
157
158    def _dedupe_spark(self, attr: str | None, strategies: StrategyMapCollection):
159        """Spark specific deduplication helper
160
161        Maps dataframe partitions to be processed via the RDD API yielding low-
162        level list[Rows], which are then post-processed back to a dataframe.
163
164        Args:
165            attr: The attribute to deduplicate.
166            strategies: the collection of strategies
167        Retuns:
168            Instance's _df attribute is updated
169        """
170        id = typing.cast(str, self._id)
171        id_type = typing.cast(DataType, PYSPARK_TYPES.get(dict(self._df.dtypes).get(id)))  # type: ignore
172
173        deduped_rdd = self._df.rdd.mapPartitions(
174            lambda partition_iter: _process_partition(partition_iter, strategies, id, attr)
175        )
176
177        if GROUP_ID in self._df.columns:
178            schema = StructType(self._df.schema.fields)
179        else:
180            schema = StructType(self._df.schema.fields + [StructField(GROUP_ID, id_type, True)])
181
182        self._df = WrappedSparkDataFrame(
183            typing.cast(SparkSession, self._spark_session).createDataFrame(deduped_rdd, schema=schema), id
184        )
185
186    # PUBLIC API:
187
188    @singledispatchmethod
189    def add_strategy(self, strategy: DeduplicationStrategy | tuple | StrategyMapCollection):
190        """
191        Add a strategy to the strategy manager.
192
193        Instances of `DeduplicationStrategy` or tuple are added to the
194        "default" key. Mapping objects update the manager directly
195
196        Args:
197            strategy: A deduplication strategy, tuple, or strategy collection
198                (mapping) to add.
199
200        Returns:
201            self is updated
202
203        Raises:
204            NotImplementedError
205        """
206        raise NotImplementedError(f"Unsupported strategy: {type(strategy)}")
207
208    @add_strategy.register(DeduplicationStrategy)
209    @add_strategy.register(tuple)
210    def _(self, strategy):
211        self._strategy_manager.add("default", strategy)
212
213    @add_strategy.register(dict)
214    def _(self, strategy: StrategyMapCollection):
215        for attr, strat_list in strategy.items():
216            for strat in strat_list:
217                self._strategy_manager.add(attr, strat)
218
219    def dedupe(self, attr: str | None = None):
220        """dedupe, and group, the data based on the provided attribute
221
222        Args:
223            attr: The attribute to deduplicate. If strategies have been added
224                as a mapping object, this must not passed, as the keys of the
225                mapping object will be used instead
226        """
227        strategies = self._strategy_manager.get()
228
229        if isinstance(self._df, WrappedSparkDataFrame):
230            self._dedupe_spark(attr, strategies)
231        else:
232            self._dedupe(attr, strategies)
233
234        self._strategy_manager.reset()
235
236    @property
237    def strategies(self) -> None | tuple[str, ...] | dict[str, tuple[str, ...]]:
238        """
239        Returns the strategies currently stored in the strategy manager.
240
241        If no strategies are stored, returns `None`. Otherwise, returns a tuple
242        of strategy names or a dictionary mapping attributes to their
243        respective strategies.
244
245        Returns:
246            The stored strategies, formatted
247        """
248        strategies = self._strategy_manager.get()
249        if not strategies:
250            return None
251
252        def parse_strategies(dict_values):
253            return tuple(
254                [
255                    (vx[0].__name__ if isinstance(vx, tuple) else vx.__class__.__name__)
256                    #
257                    for vx in dict_values
258                ]
259            )
260
261        if "default" in strategies:
262            return tuple([parse_strategies(v) for _, v in strategies.items()])[0]
263        return {k: parse_strategies(v) for k, v in strategies.items()}
264
265    @property
266    def df(self) -> DataFrameLike:
267        return self._df.unwrap()

Top-level entrypoint for grouping duplicates

This class handles initialisation of a dataframe, dispatching appropriately given the supported dataframe libraries (e.g. Pandas). An instance of this class can then accept a variety of strategies for deduplication and grouping.

Upon initialisation, DupeGrouper sets a new column, usually "group_id" — but you can control this by setting an environment variable GROUP_ID at runtime. The group_id is a monotonically increasing, numeric id column starting at 1 to the length of the dataframe provided.

DupeGrouper( df: pandas.core.frame.DataFrame | polars.dataframe.frame.DataFrame | pyspark.sql.dataframe.DataFrame | list[pyspark.sql.types.Row], spark_session: pyspark.sql.session.SparkSession | None = None, id: str | None = None)
69    def __init__(
70        self,
71        df: DataFrameLike,
72        spark_session: SparkSession | None = None,
73        id: str | None = None,
74    ):
75        self._df: WrappedDataFrame = _wrap(df, id)
76        self._strategy_manager = _StrategyManager()
77        self._spark_session = spark_session
78        self._id = id
@singledispatchmethod
def add_strategy( self, strategy: Union[dupegrouper.strategy.DeduplicationStrategy, tuple, DefaultDict[str, list[dupegrouper.strategy.DeduplicationStrategy | tuple[Callable, dict[str, str]]]]]):
188    @singledispatchmethod
189    def add_strategy(self, strategy: DeduplicationStrategy | tuple | StrategyMapCollection):
190        """
191        Add a strategy to the strategy manager.
192
193        Instances of `DeduplicationStrategy` or tuple are added to the
194        "default" key. Mapping objects update the manager directly
195
196        Args:
197            strategy: A deduplication strategy, tuple, or strategy collection
198                (mapping) to add.
199
200        Returns:
201            self is updated
202
203        Raises:
204            NotImplementedError
205        """
206        raise NotImplementedError(f"Unsupported strategy: {type(strategy)}")

Add a strategy to the strategy manager.

Instances of DeduplicationStrategy or tuple are added to the "default" key. Mapping objects update the manager directly

Arguments:
  • strategy: A deduplication strategy, tuple, or strategy collection (mapping) to add.
Returns:

self is updated

Raises:
  • NotImplementedError
def dedupe(self, attr: str | None = None):
219    def dedupe(self, attr: str | None = None):
220        """dedupe, and group, the data based on the provided attribute
221
222        Args:
223            attr: The attribute to deduplicate. If strategies have been added
224                as a mapping object, this must not passed, as the keys of the
225                mapping object will be used instead
226        """
227        strategies = self._strategy_manager.get()
228
229        if isinstance(self._df, WrappedSparkDataFrame):
230            self._dedupe_spark(attr, strategies)
231        else:
232            self._dedupe(attr, strategies)
233
234        self._strategy_manager.reset()

dedupe, and group, the data based on the provided attribute

Arguments:
  • attr: The attribute to deduplicate. If strategies have been added as a mapping object, this must not passed, as the keys of the mapping object will be used instead
strategies: None | tuple[str, ...] | dict[str, tuple[str, ...]]
236    @property
237    def strategies(self) -> None | tuple[str, ...] | dict[str, tuple[str, ...]]:
238        """
239        Returns the strategies currently stored in the strategy manager.
240
241        If no strategies are stored, returns `None`. Otherwise, returns a tuple
242        of strategy names or a dictionary mapping attributes to their
243        respective strategies.
244
245        Returns:
246            The stored strategies, formatted
247        """
248        strategies = self._strategy_manager.get()
249        if not strategies:
250            return None
251
252        def parse_strategies(dict_values):
253            return tuple(
254                [
255                    (vx[0].__name__ if isinstance(vx, tuple) else vx.__class__.__name__)
256                    #
257                    for vx in dict_values
258                ]
259            )
260
261        if "default" in strategies:
262            return tuple([parse_strategies(v) for _, v in strategies.items()])[0]
263        return {k: parse_strategies(v) for k, v in strategies.items()}

Returns the strategies currently stored in the strategy manager.

If no strategies are stored, returns None. Otherwise, returns a tuple of strategy names or a dictionary mapping attributes to their respective strategies.

Returns:

The stored strategies, formatted

df: pandas.core.frame.DataFrame | polars.dataframe.frame.DataFrame | pyspark.sql.dataframe.DataFrame | list[pyspark.sql.types.Row]
265    @property
266    def df(self) -> DataFrameLike:
267        return self._df.unwrap()