dupegrouper
A Python library for grouping duplicate data efficiently.
Introduction
dupegrouper can be used for various deduplication use cases. It's intended purpose is to implement a uniform API that allows for both exact and near deduplication — whilst also offering record selection, based on the first instance of a set of duplicates i.e. a "group".
Deduplicating data is a hard task — validating approaches takes time, can require a lot of testing, validating, and iterating through approaches that may, or may not, be applicable to your dataset.
dupegrouper abstracts away the task of actually deduplicating, so that you can focus on the most important thing: implementing an appropriate "strategy" to achieve your stated end goal ...
...In fact a "strategy" is key to dupegrouper's API. dupegrouper has:
Ready-to-use deduplication strategies
dupegrouper currently offers the following deduplication strategies:
string type | numeric type |
---|---|
Exact string | Jaccard* |
Fuzzy matching | Cosine similarity* |
TfIdf | - |
LSH* | - |
* due for implementation in a future version
You can also implement custom deduplication logic, which dupegrouper can readily accept, as descripted in Custom Strategies.
Multiple backend support
dupegrouper aims to scale in line with your problem. The following backends are currently support:
- Pandas
- Polars
- PySpark
A flexible API
Checkout the API Documentation
Installation
pip install dupegrouper
Example
import dupegrouper
dg = dupegrouper.DupeGrouper(df) # input dataframe
dg.add_strategy(dupegrouper.strategies.Exact())
dg.dedupe("address")
dg.df # retrieve dataframe
Usage Guide
Adding Strategies
dupegrouper comes with ready-to-use deduplication strategies:
Strategies can be added one-by-one and are executed in the order in which they are added. In the below case the, the address
column will firstly be deduplicted exactly, and then using Fuzzy matching.
# Deduplicate the address column
dg = dupegrouper.DupeGrouper(df)
dg.add_strategy(dupegrouper.strategies.Exact())
dg.add_strategy(dupegrouper.strategies.Fuzzy(tolerance=0.3))
dg.dedupe("address")
Or, you can add a map of strategies. In this case, strategies are executed in their defined order, for each map key. The below implementation will produce the same as above.
# Also deduplicates the address column
dg = dupegrouper.DupeGrouper(df)
dg.add_strategy({
"address": [
dupegrouper.strategies.Exact(),
dupegrouper.strategies.Fuzzy(tolerance=0.3),
]
})
print(dg.strategies)
# {'address': ('Exact', 'Fuzzy', 'TfIdf')}
dg.dedupe() # No Argument!
A call of dedupe()
will reset the strategies:
...
print(dg.strategies)
# {'address': ('Exact', 'Fuzzy', 'TfIdf')}
dg.dedupe()
print(dg.strategies)
# None
Custom Strategies
Maybe you need some custom deduplication methodology. An instance of dupegrouper.DupeGrouper
can accept custom functions too.
def my_func(df, attr: str, /, **kwargs) -> dict[str, str]:
my_map = {}
for row in df:
# e.g. use **kwargs
my_map = ...
return my_map
Above, my_func
is a (very) boilerplate custom deduplication implementation:
- it accepts a dataframe (
df
) - it will deduplicate on a specific attribute (
attr
) - And accepts other keyword arguments specific to your problem (
**kwargs
)
Look closely at the function signature — your function needs to implement this exactly. Additionally it produces a map where a key-value pair represents a deduplication match where the value is the new selected record ("group").
In the current implementation, there is no guarantee that a generator can be used to yield
deduplicated value maps
You can proceed to add your custom function as a strategy:
dg = dupegrouper.DupeGrouper(df)
dg.add_strategy((my_func, {"match_str": "london"}))
print(dg.strategies) # returns ("my_func",)
dg.dedupe("address")
In the current implementation, any custom callable will also always dedupe exact matches!
Creating a Comprehensive Strategy
You can use the above techniques for a comprehensive strategy to deduplicate your data:
import dupegrouper
import pandas # | polars | pyspark
df = pd.read_csv("example.csv")
dg = dupegrouper.DupeGrouper(df)
strategies = {
"address": [
dupegrouper.strategies.Exact(),
dupegrouper.strategies.Fuzzy(tolerance=0.5),
(my_func, {"match": "london"}), # any address that contains "london"
],
"email": [
dupegrouper.strategies.Exact(),
dupegrouper.strategies.Fuzzy(tolerance=0.3),
dupegrouper.strategies.TfIdf(tolerance=0.4, ngram=3, topn=2),
],
}
dg.add_strategy(strategies)
dg.dedupe()
df = dg.df
Using the PySpark backend
dupegrouper can be used as described in Creating a Comprehensive Strategy. dupegrouper is partition-aware, and will deduplicate each partition, per worker node, given the defined strategy. Such a distributed implementation puts the onus on appropriate planning:
- partitions must already be containers of expected duplicates
- partitioning (or re-partitioning) must be planned ahead of time
The above problem is typically dealt with the use of a "blocking key" which is the partitioning/repartitioning key. Whilst several approaches may be valid, a blocking key be typically be computed as a general property of several records that are expected to contain duplicates. As an example, that might be the first N characters of a given attribute needing deduplicating.
Extending the API for Custom Implementations
It's recommended that for simple custom implementations you use the approach discussed for custom functions. (see Custom Strategies).
However, you can derive directly from the abstract base class dupegrouper.strategy.DeduplicationStrategy
, and thus make direct use of the efficient, core deduplication methods implemented in this library, as described in it's API. This will expose a dedupe()
method, ready for direct use within an instance of DupeGrouper
, much the same way that other dupegrouper.strategies
are passed in as strategies.
About
License
This project is licensed under the Apache-2.0 License. See the LICENSE file for more details.
55class DupeGrouper: 56 """Top-level entrypoint for grouping duplicates 57 58 This class handles initialisation of a dataframe, dispatching appropriately 59 given the supported dataframe libraries (e.g. Pandas). An instance of this 60 class can then accept a variety of strategies for deduplication and 61 grouping. 62 63 Upon initialisation, `DupeGrouper` sets a new column, usually `"group_id"` 64 — but you can control this by setting an environment variable `GROUP_ID` at 65 runtime. The group_id is a monotonically increasing, numeric id column 66 starting at 1 to the length of the dataframe provided. 67 """ 68 69 def __init__( 70 self, 71 df: DataFrameLike, 72 spark_session: SparkSession | None = None, 73 id: str | None = None, 74 ): 75 self._df: WrappedDataFrame = _wrap(df, id) 76 self._strategy_manager = _StrategyManager() 77 self._spark_session = spark_session 78 self._id = id 79 80 @singledispatchmethod 81 def _call_strategy_deduper( 82 self, 83 strategy: DeduplicationStrategy | tuple[typing.Callable, typing.Any], 84 attr: str, 85 ): 86 """Dispatch the appropriate strategy deduplication method. 87 88 If the strategy is an instance of a dupegrouper `DeduplicationStrategy` 89 the strategy will have been added as such, with it's parameters. In the 90 case of a custom implementation of a Callable, passed as a tuple, we 91 pass this *directly* to the `Custom` class and initialise that. 92 93 Args: 94 strategy: A `dupegrouper` deduplication strategy or a tuple 95 containing a (customer) callable and its parameters. 96 attr: The attribute used for deduplication. 97 98 Returns: 99 A deduplicated dataframe 100 101 Raises: 102 NotImplementedError. 103 """ 104 del attr # Unused 105 106 raise NotImplementedError(f"Unsupported strategy: {type(strategy)}") 107 108 @_call_strategy_deduper.register(DeduplicationStrategy) 109 def _(self, strategy, attr) -> WrappedDataFrame: 110 return strategy.with_frame(self._df).dedupe(attr) 111 112 @_call_strategy_deduper.register(tuple) 113 def _(self, strategy: tuple[typing.Callable, typing.Any], attr) -> WrappedDataFrame: 114 func, kwargs = strategy 115 return Custom(func, attr, **kwargs).with_frame(self._df).dedupe() 116 117 @singledispatchmethod 118 def _dedupe( 119 self, 120 attr: str | None, 121 strategies: StrategyMapCollection, 122 ): 123 """Dispatch the appropriate deduplication logic. 124 125 If strategies have been added individually, they are stored under a 126 "default" key and retrived as such when the public `.dedupe` method is 127 called _with_ the attribute label. In the case of having added 128 strategies in one go with a direct dict (mapping) object, the attribute 129 label is first extracted from strategy collection dictionary keys. 130 Upon completing deduplication the strategy collection is wiped for 131 (any) subsequent deduplication. 132 133 Args: 134 attr: The attribute used for deduplication; or None in the case 135 of strategies being a mapping object 136 137 Returns: 138 None; internal `_df` attribute is updated. 139 140 Raises: 141 NotImplementedError. 142 """ 143 del strategies # Unused 144 raise NotImplementedError(f"Unsupported attribute type: {type(attr)}") 145 146 @_dedupe.register(str) 147 def _(self, attr, strategies): 148 for strategy in strategies["default"]: 149 self._df = self._call_strategy_deduper(strategy, attr) 150 151 @_dedupe.register(NoneType) 152 def _(self, attr, strategies): 153 del attr # Unused 154 for attr, strategies in strategies.items(): 155 for strategy in strategies: 156 self._df = self._call_strategy_deduper(strategy, attr) 157 158 def _dedupe_spark(self, attr: str | None, strategies: StrategyMapCollection): 159 """Spark specific deduplication helper 160 161 Maps dataframe partitions to be processed via the RDD API yielding low- 162 level list[Rows], which are then post-processed back to a dataframe. 163 164 Args: 165 attr: The attribute to deduplicate. 166 strategies: the collection of strategies 167 Retuns: 168 Instance's _df attribute is updated 169 """ 170 id = typing.cast(str, self._id) 171 id_type = typing.cast(DataType, PYSPARK_TYPES.get(dict(self._df.dtypes).get(id))) # type: ignore 172 173 deduped_rdd = self._df.rdd.mapPartitions( 174 lambda partition_iter: _process_partition(partition_iter, strategies, id, attr) 175 ) 176 177 if GROUP_ID in self._df.columns: 178 schema = StructType(self._df.schema.fields) 179 else: 180 schema = StructType(self._df.schema.fields + [StructField(GROUP_ID, id_type, True)]) 181 182 self._df = WrappedSparkDataFrame( 183 typing.cast(SparkSession, self._spark_session).createDataFrame(deduped_rdd, schema=schema), id 184 ) 185 186 # PUBLIC API: 187 188 @singledispatchmethod 189 def add_strategy(self, strategy: DeduplicationStrategy | tuple | StrategyMapCollection): 190 """ 191 Add a strategy to the strategy manager. 192 193 Instances of `DeduplicationStrategy` or tuple are added to the 194 "default" key. Mapping objects update the manager directly 195 196 Args: 197 strategy: A deduplication strategy, tuple, or strategy collection 198 (mapping) to add. 199 200 Returns: 201 self is updated 202 203 Raises: 204 NotImplementedError 205 """ 206 raise NotImplementedError(f"Unsupported strategy: {type(strategy)}") 207 208 @add_strategy.register(DeduplicationStrategy) 209 @add_strategy.register(tuple) 210 def _(self, strategy): 211 self._strategy_manager.add("default", strategy) 212 213 @add_strategy.register(dict) 214 def _(self, strategy: StrategyMapCollection): 215 for attr, strat_list in strategy.items(): 216 for strat in strat_list: 217 self._strategy_manager.add(attr, strat) 218 219 def dedupe(self, attr: str | None = None): 220 """dedupe, and group, the data based on the provided attribute 221 222 Args: 223 attr: The attribute to deduplicate. If strategies have been added 224 as a mapping object, this must not passed, as the keys of the 225 mapping object will be used instead 226 """ 227 strategies = self._strategy_manager.get() 228 229 if isinstance(self._df, WrappedSparkDataFrame): 230 self._dedupe_spark(attr, strategies) 231 else: 232 self._dedupe(attr, strategies) 233 234 self._strategy_manager.reset() 235 236 @property 237 def strategies(self) -> None | tuple[str, ...] | dict[str, tuple[str, ...]]: 238 """ 239 Returns the strategies currently stored in the strategy manager. 240 241 If no strategies are stored, returns `None`. Otherwise, returns a tuple 242 of strategy names or a dictionary mapping attributes to their 243 respective strategies. 244 245 Returns: 246 The stored strategies, formatted 247 """ 248 strategies = self._strategy_manager.get() 249 if not strategies: 250 return None 251 252 def parse_strategies(dict_values): 253 return tuple( 254 [ 255 (vx[0].__name__ if isinstance(vx, tuple) else vx.__class__.__name__) 256 # 257 for vx in dict_values 258 ] 259 ) 260 261 if "default" in strategies: 262 return tuple([parse_strategies(v) for _, v in strategies.items()])[0] 263 return {k: parse_strategies(v) for k, v in strategies.items()} 264 265 @property 266 def df(self) -> DataFrameLike: 267 return self._df.unwrap()
Top-level entrypoint for grouping duplicates
This class handles initialisation of a dataframe, dispatching appropriately given the supported dataframe libraries (e.g. Pandas). An instance of this class can then accept a variety of strategies for deduplication and grouping.
Upon initialisation, DupeGrouper
sets a new column, usually "group_id"
— but you can control this by setting an environment variable GROUP_ID
at
runtime. The group_id is a monotonically increasing, numeric id column
starting at 1 to the length of the dataframe provided.
188 @singledispatchmethod 189 def add_strategy(self, strategy: DeduplicationStrategy | tuple | StrategyMapCollection): 190 """ 191 Add a strategy to the strategy manager. 192 193 Instances of `DeduplicationStrategy` or tuple are added to the 194 "default" key. Mapping objects update the manager directly 195 196 Args: 197 strategy: A deduplication strategy, tuple, or strategy collection 198 (mapping) to add. 199 200 Returns: 201 self is updated 202 203 Raises: 204 NotImplementedError 205 """ 206 raise NotImplementedError(f"Unsupported strategy: {type(strategy)}")
Add a strategy to the strategy manager.
Instances of DeduplicationStrategy
or tuple are added to the
"default" key. Mapping objects update the manager directly
Arguments:
- strategy: A deduplication strategy, tuple, or strategy collection (mapping) to add.
Returns:
self is updated
Raises:
- NotImplementedError
219 def dedupe(self, attr: str | None = None): 220 """dedupe, and group, the data based on the provided attribute 221 222 Args: 223 attr: The attribute to deduplicate. If strategies have been added 224 as a mapping object, this must not passed, as the keys of the 225 mapping object will be used instead 226 """ 227 strategies = self._strategy_manager.get() 228 229 if isinstance(self._df, WrappedSparkDataFrame): 230 self._dedupe_spark(attr, strategies) 231 else: 232 self._dedupe(attr, strategies) 233 234 self._strategy_manager.reset()
dedupe, and group, the data based on the provided attribute
Arguments:
- attr: The attribute to deduplicate. If strategies have been added as a mapping object, this must not passed, as the keys of the mapping object will be used instead
236 @property 237 def strategies(self) -> None | tuple[str, ...] | dict[str, tuple[str, ...]]: 238 """ 239 Returns the strategies currently stored in the strategy manager. 240 241 If no strategies are stored, returns `None`. Otherwise, returns a tuple 242 of strategy names or a dictionary mapping attributes to their 243 respective strategies. 244 245 Returns: 246 The stored strategies, formatted 247 """ 248 strategies = self._strategy_manager.get() 249 if not strategies: 250 return None 251 252 def parse_strategies(dict_values): 253 return tuple( 254 [ 255 (vx[0].__name__ if isinstance(vx, tuple) else vx.__class__.__name__) 256 # 257 for vx in dict_values 258 ] 259 ) 260 261 if "default" in strategies: 262 return tuple([parse_strategies(v) for _, v in strategies.items()])[0] 263 return {k: parse_strategies(v) for k, v in strategies.items()}
Returns the strategies currently stored in the strategy manager.
If no strategies are stored, returns None
. Otherwise, returns a tuple
of strategy names or a dictionary mapping attributes to their
respective strategies.
Returns:
The stored strategies, formatted