dupegrouper

A Python library for grouping duplicate data efficiently.

PyPI Version PyPI - Python Version

Introduction

dupegrouper can be used for various deduplication use cases. It's intended purpose is to implement a uniform API that allows for both exact and near deduplication — whilst collecting duplicate instances into sets — i.e. "groups".

Deduplicating data is a hard task — validating approaches takes time, can require a lot of testing, validating, and iterating through approaches that may, or may not, be applicable to your dataset.

dupegrouper abstracts away the task of actually deduplicating, so that you can focus on the most important thing: implementing an appropriate "strategy" to achieve your stated end goal ...

...In fact a "strategy" is key to dupegrouper's API. dupegrouper has:

  • Ready-to-use deduplication strategies
  • Pandas and Polars support
  • A flexible API

Checkout the API Documentation.

Installation

pip install dupegrouper

Example

import dupegrouper

dg = dupegrouper.DupeGrouper(df) # input dataframe

dg.add_strategy(dupegrouper.strategies.Exact())

dg.dedupe("address")

dg.df # retrieve dataframe

Usage Guide

Adding Strategies

dupegrouper comes with ready-to-use deduplication strategies:

You can then add these in the order you want to apply them:

# Deduplicate the address column

dg = dupegrouper.DupeGrouper(df)

dg.add_strategy(dupegrouper.strategies.Exact())
dg.add_strategy(dupegrouper.strategies.Fuzzy(tolerance=0.3))

dg.dedupe("address")

Or, add a map of strategies:

# Also deduplicates the address column

dg = dupegrouper.DupeGrouper(df)

dg.add_strategy({
    "address": [
        dupegrouper.strategies.Exact(),
        dupegrouper.strategies.Fuzzy(tolerance=0.3),
    ]
})

dg.dedupe() # No Argument!

Custom Strategies

An insance of dupegrouper.DupeGrouper can accept custom functions too.

def my_func(df: pd.DataFrame, attr: str, /, match_str: str) -> dict[str, str]:
    """deduplicates df if any given row contains `match_str`"""
    my_map = {}
    for irow, _ in df.iterrows():
        left: str = df.at[irow, attr]
        my_map[left] = left
        for jrow, _ in df.iterrows():
            right: str = df.at[jrow, attr]
            if match_str in left.lower() and match_str in right.lower():
                my_map[left] = right
                break
    return my_map

Above, my_func deserves a custom implementation: it deduplicates rows only if said rows contain a the partial string match_str. You can then proceed to add your custom function as a strategy:

dg = dupegrouper.DupeGrouper(df)

dg.add_strategy((my_func, {"match_str": "london"}))

print(dg.strategies) # returns ("my_func",)

dg.dedupe("address")
Note

Your custom function's signature must be two positional arguments followed by keyword arguments:

(df: DataFrame, attr: str, /, **kwargs) -> dict[str, str]

Where attr is the attribute you wish to deduplicate.

Warning

In the current implementation, any custom callable will also always dedupe exact matches!

Creating a Comprehensive Strategy

You can use the above techniques for a comprehensive strategy to deduplicate your data:

import dupegrouper
import pandas # or polars

df = pd.read_csv("example.csv")

dg = dupegrouper.DupeGrouper(df)

strategies = {
    "address": [
        dupegrouper.strategies.Exact(),
        dupegrouper.strategies.Fuzzy(tolerance=0.5),
        (my_func, {"match_str": "london"}),
    ],
    "email": [
        dupegrouper.strategies.Exact(),
        dupegrouper.strategies.Fuzzy(tolerance=0.3),
        dupegrouper.strategies.TfIdf(tolerance=0.4, ngram=3, topn=2),
    ],
}

dg.add_strategy(strategies)

dg.dedupe()

df = dg.df

Extending the API for Custom Implementations

It's recommended that for simple custom implementations you use the approach discussed for custom functions. (see Custom Strategies).

However, you can derive directly from the abstract base class dupegrouper.strategy.DeduplicationStrategy, and thus make direct use of the efficient, core deduplication methods implemented in this library, as described in it's API. This will expose a dedupe() method, ready for direct use within an instance of DupeGrouper, much the same way that other dupegrouper.strategies are passed in as strategies.

About

License

This project is licensed under the Apache-2.0 License. See the LICENSE file for more details.

 1"""
 2.. include:: ../README.md
 3"""
 4
 5from dupegrouper.base import DupeGrouper
 6from dupegrouper import strategy, strategies  # submodules
 7
 8
 9__all__ = [
10    "DupeGrouper",
11    "strategy",
12    "strategies",
13]
class DupeGrouper:
161class DupeGrouper:
162    """Top-level entrypoint for grouping duplicates
163
164    This class handles initialisation of a dataframe, dispatching appropriately
165    given the supported dataframe libraries (e.g. Pandas). An instance of this
166    class can then accept a variety of strategies for deduplication and
167    grouping.
168
169    Upon initialisation, `DupeGrouper` sets a new column, usually `"group_id"`
170    — but you can control this by setting an environment variable `GROUP_ID` at
171    runtime. The group_id is linearly increasing, numeric id column starting at
172    1 to the length of the dataframe provided.
173    """
174
175    def __init__(self, df: pd.DataFrame):
176        self._df = _InitDataFrame(df).choose
177        self._strategy_manager = _StrategyManager()
178
179    @singledispatchmethod
180    def _call_strategy_deduper(
181        self,
182        strategy: DeduplicationStrategy | tuple[typing.Callable, typing.Any],
183        attr: str,
184    ):
185        """Dispatch the appropriate strategy deduplication method.
186
187        If the strategy is an instance of a dupegrouper `DeduplicationStrategy`
188        the strategy will have been added as such, with it's parameters. In the
189        case of a custom implementation of a Callable, passed as a tuple, we
190        pass this *directly* to the `Custom` class and initialise that.
191
192        Args:
193            strategy: A `dupegrouper` deduplication strategy or a tuple
194                containing a (customer) callable and its parameters.
195            attr: The attribute used for deduplication.
196
197        Returns:
198            A deduplicated dataframe
199
200        Raises:
201            NotImplementedError.
202        """
203        del attr  # Unused
204        return NotImplementedError(f"Unsupported strategy: {type(strategy)}")
205
206    @_call_strategy_deduper.register(DeduplicationStrategy)
207    def _(self, strategy, attr):
208        return strategy._set_df(self._df).dedupe(attr)
209
210    @_call_strategy_deduper.register(tuple)
211    def _(self, strategy: tuple[typing.Callable, typing.Any], attr):
212        func, kwargs = strategy
213        return Custom(func, attr, **kwargs)._set_df(self._df).dedupe()
214
215    @singledispatchmethod
216    def _dedupe(
217        self,
218        attr: str | None,
219        strategy_collection: strategy_map_collection,
220    ):
221        """Dispatch the appropriate deduplication logic.
222
223        If strategies have been added individually, they are stored under a
224        "default" key and retrived as such when the public `.dedupe` method is
225        called _with_ the attribute label. In the case of having added
226        strategies in one go with a direct dict (mapping) object, the attribute
227        label is first extracted from strategy collection dictionary keys.
228        Upon completing deduplication the strategy collection is wiped for
229        (any) subsequent deduplication.
230
231        Args:
232            attr: The attribute used for deduplication; or None in the case
233                of strategies being a mapping object
234
235        Returns:
236            None; internal `_df` attribute is updated.
237
238        Raises:
239            NotImplementedError.
240        """
241        del strategy_collection  # Unused
242        raise NotImplementedError(f"Unsupported type: {type(attr)}")
243
244    @_dedupe.register(str)
245    def _(self, attr, strategy_collection):
246        for strategy in strategy_collection["default"]:
247            self._df = self._call_strategy_deduper(strategy, attr)
248        self._strategy_manager.reset()
249
250    @_dedupe.register(NoneType)
251    def _(self, attr, strategy_collection):
252        del attr  # Unused
253        for attr, strategies in strategy_collection.items():
254            for strategy in strategies:
255                self._df = self._call_strategy_deduper(strategy, attr)
256        self._strategy_manager.reset()
257
258    # PUBLIC API:
259
260    @singledispatchmethod
261    def add_strategy(self, strategy: DeduplicationStrategy | tuple | strategy_map_collection):
262        """
263        Add a strategy to the strategy manager.
264
265        Instances of `DeduplicationStrategy` or tuple are added to the
266        "default" key. Mapping objects update the manager directly
267
268        Args:
269            strategy: A deduplication strategy, tuple, or strategy collection
270                (mapping) to add.
271
272        Returns:
273            self is updated
274
275        Raises:
276            NotImplementedError
277        """
278        return NotImplementedError(f"Unsupported strategy: {type(strategy)}")
279
280    @add_strategy.register(DeduplicationStrategy)
281    @add_strategy.register(tuple)
282    def _(self, strategy):
283        self._strategy_manager.add("default", strategy)
284
285    @add_strategy.register(dict)
286    def _(self, strategy: strategy_map_collection):
287        for attr, strat_list in strategy.items():
288            for strat in strat_list:
289                self._strategy_manager.add(attr, strat)
290
291    def dedupe(self, attr: str | None = None):
292        """dedupe, and group, the data based on the provided attribute
293
294        Args:
295            attr: The attribute to deduplicate. If stratgies have been added as
296                a mapping object, this must not passed, as the keys of the
297                mapping object will be used instead
298        """
299        self._dedupe(attr, self._strategy_manager.get())
300
301    @property
302    def strategies(self) -> None | tuple[str, ...] | dict[str, tuple[str, ...]]:
303        """
304        Returns the strategies currently stored in the strategy manager.
305
306        If no strategies are stored, returns `None`. Otherwise, returns a tuple
307        of strategy names or a dictionary mapping attributes to their
308        respective strategies.
309
310        Returns:
311            The stored strategies, formatted
312        """
313        strategies = self._strategy_manager.get()
314        if not strategies:
315            return None
316
317        def parse_strategies(dict_values):
318            return tuple([(vx[0].__name__ if isinstance(vx, tuple) else vx.__class__.__name__) for vx in dict_values])
319
320        if "default" in strategies:
321            return tuple([parse_strategies(v) for _, v in strategies.items()])[0]
322        return {k: parse_strategies(v) for k, v in strategies.items()}
323
324    @property
325    def df(self) -> pd.DataFrame:
326        return self._df

Top-level entrypoint for grouping duplicates

This class handles initialisation of a dataframe, dispatching appropriately given the supported dataframe libraries (e.g. Pandas). An instance of this class can then accept a variety of strategies for deduplication and grouping.

Upon initialisation, DupeGrouper sets a new column, usually "group_id" — but you can control this by setting an environment variable GROUP_ID at runtime. The group_id is linearly increasing, numeric id column starting at 1 to the length of the dataframe provided.

DupeGrouper(df: pandas.core.frame.DataFrame)
175    def __init__(self, df: pd.DataFrame):
176        self._df = _InitDataFrame(df).choose
177        self._strategy_manager = _StrategyManager()
@singledispatchmethod
def add_strategy( self, strategy: Union[dupegrouper.strategy.DeduplicationStrategy, tuple, DefaultDict[str, list[dupegrouper.strategy.DeduplicationStrategy | tuple[Callable, dict[str, str]]]]]):
260    @singledispatchmethod
261    def add_strategy(self, strategy: DeduplicationStrategy | tuple | strategy_map_collection):
262        """
263        Add a strategy to the strategy manager.
264
265        Instances of `DeduplicationStrategy` or tuple are added to the
266        "default" key. Mapping objects update the manager directly
267
268        Args:
269            strategy: A deduplication strategy, tuple, or strategy collection
270                (mapping) to add.
271
272        Returns:
273            self is updated
274
275        Raises:
276            NotImplementedError
277        """
278        return NotImplementedError(f"Unsupported strategy: {type(strategy)}")

Add a strategy to the strategy manager.

Instances of DeduplicationStrategy or tuple are added to the "default" key. Mapping objects update the manager directly

Arguments:
  • strategy: A deduplication strategy, tuple, or strategy collection (mapping) to add.
Returns:

self is updated

Raises:
  • NotImplementedError
def dedupe(self, attr: str | None = None):
291    def dedupe(self, attr: str | None = None):
292        """dedupe, and group, the data based on the provided attribute
293
294        Args:
295            attr: The attribute to deduplicate. If stratgies have been added as
296                a mapping object, this must not passed, as the keys of the
297                mapping object will be used instead
298        """
299        self._dedupe(attr, self._strategy_manager.get())

dedupe, and group, the data based on the provided attribute

Arguments:
  • attr: The attribute to deduplicate. If stratgies have been added as a mapping object, this must not passed, as the keys of the mapping object will be used instead
strategies: None | tuple[str, ...] | dict[str, tuple[str, ...]]
301    @property
302    def strategies(self) -> None | tuple[str, ...] | dict[str, tuple[str, ...]]:
303        """
304        Returns the strategies currently stored in the strategy manager.
305
306        If no strategies are stored, returns `None`. Otherwise, returns a tuple
307        of strategy names or a dictionary mapping attributes to their
308        respective strategies.
309
310        Returns:
311            The stored strategies, formatted
312        """
313        strategies = self._strategy_manager.get()
314        if not strategies:
315            return None
316
317        def parse_strategies(dict_values):
318            return tuple([(vx[0].__name__ if isinstance(vx, tuple) else vx.__class__.__name__) for vx in dict_values])
319
320        if "default" in strategies:
321            return tuple([parse_strategies(v) for _, v in strategies.items()])[0]
322        return {k: parse_strategies(v) for k, v in strategies.items()}

Returns the strategies currently stored in the strategy manager.

If no strategies are stored, returns None. Otherwise, returns a tuple of strategy names or a dictionary mapping attributes to their respective strategies.

Returns:

The stored strategies, formatted

df: pandas.core.frame.DataFrame
324    @property
325    def df(self) -> pd.DataFrame:
326        return self._df