dupegrouper
A Python library for grouping duplicate data efficiently.
Introduction
dupegrouper can be used for various deduplication use cases. It's intended purpose is to implement a uniform API that allows for both exact and near deduplication — whilst collecting duplicate instances into sets — i.e. "groups".
Deduplicating data is a hard task — validating approaches takes time, can require a lot of testing, validating, and iterating through approaches that may, or may not, be applicable to your dataset.
dupegrouper abstracts away the task of actually deduplicating, so that you can focus on the most important thing: implementing an appropriate "strategy" to achieve your stated end goal ...
...In fact a "strategy" is key to dupegrouper's API. dupegrouper has:
- Ready-to-use deduplication strategies
- Pandas and Polars support
- A flexible API
Checkout the API Documentation.
Installation
pip install dupegrouper
Example
import dupegrouper
dg = dupegrouper.DupeGrouper(df) # input dataframe
dg.add_strategy(dupegrouper.strategies.Exact())
dg.dedupe("address")
dg.df # retrieve dataframe
Usage Guide
Adding Strategies
dupegrouper comes with ready-to-use deduplication strategies:
You can then add these in the order you want to apply them:
# Deduplicate the address column
dg = dupegrouper.DupeGrouper(df)
dg.add_strategy(dupegrouper.strategies.Exact())
dg.add_strategy(dupegrouper.strategies.Fuzzy(tolerance=0.3))
dg.dedupe("address")
Or, add a map of strategies:
# Also deduplicates the address column
dg = dupegrouper.DupeGrouper(df)
dg.add_strategy({
"address": [
dupegrouper.strategies.Exact(),
dupegrouper.strategies.Fuzzy(tolerance=0.3),
]
})
dg.dedupe() # No Argument!
Custom Strategies
An insance of dupegrouper.DupeGrouper
can accept custom functions too.
def my_func(df: pd.DataFrame, attr: str, /, match_str: str) -> dict[str, str]:
"""deduplicates df if any given row contains `match_str`"""
my_map = {}
for irow, _ in df.iterrows():
left: str = df.at[irow, attr]
my_map[left] = left
for jrow, _ in df.iterrows():
right: str = df.at[jrow, attr]
if match_str in left.lower() and match_str in right.lower():
my_map[left] = right
break
return my_map
Above, my_func
deserves a custom implementation: it deduplicates rows only if said rows contain a the partial string match_str
. You can then proceed to add your custom function as a strategy:
dg = dupegrouper.DupeGrouper(df)
dg.add_strategy((my_func, {"match_str": "london"}))
print(dg.strategies) # returns ("my_func",)
dg.dedupe("address")
Your custom function's signature must be two positional arguments followed by keyword arguments:
(df: DataFrame, attr: str, /, **kwargs) -> dict[str, str]
Where attr
is the attribute you wish to deduplicate.
In the current implementation, any custom callable will also always dedupe exact matches!
Creating a Comprehensive Strategy
You can use the above techniques for a comprehensive strategy to deduplicate your data:
import dupegrouper
import pandas # or polars
df = pd.read_csv("example.csv")
dg = dupegrouper.DupeGrouper(df)
strategies = {
"address": [
dupegrouper.strategies.Exact(),
dupegrouper.strategies.Fuzzy(tolerance=0.5),
(my_func, {"match_str": "london"}),
],
"email": [
dupegrouper.strategies.Exact(),
dupegrouper.strategies.Fuzzy(tolerance=0.3),
dupegrouper.strategies.TfIdf(tolerance=0.4, ngram=3, topn=2),
],
}
dg.add_strategy(strategies)
dg.dedupe()
df = dg.df
Extending the API for Custom Implementations
It's recommended that for simple custom implementations you use the approach discussed for custom functions. (see Custom Strategies).
However, you can derive directly from the abstract base class dupegrouper.strategy.DeduplicationStrategy
, and thus make direct use of the efficient, core deduplication methods implemented in this library, as described in it's API. This will expose a dedupe()
method, ready for direct use within an instance of DupeGrouper
, much the same way that other dupegrouper.strategies
are passed in as strategies.
About
License
This project is licensed under the Apache-2.0 License. See the LICENSE file for more details.
161class DupeGrouper: 162 """Top-level entrypoint for grouping duplicates 163 164 This class handles initialisation of a dataframe, dispatching appropriately 165 given the supported dataframe libraries (e.g. Pandas). An instance of this 166 class can then accept a variety of strategies for deduplication and 167 grouping. 168 169 Upon initialisation, `DupeGrouper` sets a new column, usually `"group_id"` 170 — but you can control this by setting an environment variable `GROUP_ID` at 171 runtime. The group_id is linearly increasing, numeric id column starting at 172 1 to the length of the dataframe provided. 173 """ 174 175 def __init__(self, df: pd.DataFrame): 176 self._df = _InitDataFrame(df).choose 177 self._strategy_manager = _StrategyManager() 178 179 @singledispatchmethod 180 def _call_strategy_deduper( 181 self, 182 strategy: DeduplicationStrategy | tuple[typing.Callable, typing.Any], 183 attr: str, 184 ): 185 """Dispatch the appropriate strategy deduplication method. 186 187 If the strategy is an instance of a dupegrouper `DeduplicationStrategy` 188 the strategy will have been added as such, with it's parameters. In the 189 case of a custom implementation of a Callable, passed as a tuple, we 190 pass this *directly* to the `Custom` class and initialise that. 191 192 Args: 193 strategy: A `dupegrouper` deduplication strategy or a tuple 194 containing a (customer) callable and its parameters. 195 attr: The attribute used for deduplication. 196 197 Returns: 198 A deduplicated dataframe 199 200 Raises: 201 NotImplementedError. 202 """ 203 del attr # Unused 204 return NotImplementedError(f"Unsupported strategy: {type(strategy)}") 205 206 @_call_strategy_deduper.register(DeduplicationStrategy) 207 def _(self, strategy, attr): 208 return strategy._set_df(self._df).dedupe(attr) 209 210 @_call_strategy_deduper.register(tuple) 211 def _(self, strategy: tuple[typing.Callable, typing.Any], attr): 212 func, kwargs = strategy 213 return Custom(func, attr, **kwargs)._set_df(self._df).dedupe() 214 215 @singledispatchmethod 216 def _dedupe( 217 self, 218 attr: str | None, 219 strategy_collection: strategy_map_collection, 220 ): 221 """Dispatch the appropriate deduplication logic. 222 223 If strategies have been added individually, they are stored under a 224 "default" key and retrived as such when the public `.dedupe` method is 225 called _with_ the attribute label. In the case of having added 226 strategies in one go with a direct dict (mapping) object, the attribute 227 label is first extracted from strategy collection dictionary keys. 228 Upon completing deduplication the strategy collection is wiped for 229 (any) subsequent deduplication. 230 231 Args: 232 attr: The attribute used for deduplication; or None in the case 233 of strategies being a mapping object 234 235 Returns: 236 None; internal `_df` attribute is updated. 237 238 Raises: 239 NotImplementedError. 240 """ 241 del strategy_collection # Unused 242 raise NotImplementedError(f"Unsupported type: {type(attr)}") 243 244 @_dedupe.register(str) 245 def _(self, attr, strategy_collection): 246 for strategy in strategy_collection["default"]: 247 self._df = self._call_strategy_deduper(strategy, attr) 248 self._strategy_manager.reset() 249 250 @_dedupe.register(NoneType) 251 def _(self, attr, strategy_collection): 252 del attr # Unused 253 for attr, strategies in strategy_collection.items(): 254 for strategy in strategies: 255 self._df = self._call_strategy_deduper(strategy, attr) 256 self._strategy_manager.reset() 257 258 # PUBLIC API: 259 260 @singledispatchmethod 261 def add_strategy(self, strategy: DeduplicationStrategy | tuple | strategy_map_collection): 262 """ 263 Add a strategy to the strategy manager. 264 265 Instances of `DeduplicationStrategy` or tuple are added to the 266 "default" key. Mapping objects update the manager directly 267 268 Args: 269 strategy: A deduplication strategy, tuple, or strategy collection 270 (mapping) to add. 271 272 Returns: 273 self is updated 274 275 Raises: 276 NotImplementedError 277 """ 278 return NotImplementedError(f"Unsupported strategy: {type(strategy)}") 279 280 @add_strategy.register(DeduplicationStrategy) 281 @add_strategy.register(tuple) 282 def _(self, strategy): 283 self._strategy_manager.add("default", strategy) 284 285 @add_strategy.register(dict) 286 def _(self, strategy: strategy_map_collection): 287 for attr, strat_list in strategy.items(): 288 for strat in strat_list: 289 self._strategy_manager.add(attr, strat) 290 291 def dedupe(self, attr: str | None = None): 292 """dedupe, and group, the data based on the provided attribute 293 294 Args: 295 attr: The attribute to deduplicate. If stratgies have been added as 296 a mapping object, this must not passed, as the keys of the 297 mapping object will be used instead 298 """ 299 self._dedupe(attr, self._strategy_manager.get()) 300 301 @property 302 def strategies(self) -> None | tuple[str, ...] | dict[str, tuple[str, ...]]: 303 """ 304 Returns the strategies currently stored in the strategy manager. 305 306 If no strategies are stored, returns `None`. Otherwise, returns a tuple 307 of strategy names or a dictionary mapping attributes to their 308 respective strategies. 309 310 Returns: 311 The stored strategies, formatted 312 """ 313 strategies = self._strategy_manager.get() 314 if not strategies: 315 return None 316 317 def parse_strategies(dict_values): 318 return tuple([(vx[0].__name__ if isinstance(vx, tuple) else vx.__class__.__name__) for vx in dict_values]) 319 320 if "default" in strategies: 321 return tuple([parse_strategies(v) for _, v in strategies.items()])[0] 322 return {k: parse_strategies(v) for k, v in strategies.items()} 323 324 @property 325 def df(self) -> pd.DataFrame: 326 return self._df
Top-level entrypoint for grouping duplicates
This class handles initialisation of a dataframe, dispatching appropriately given the supported dataframe libraries (e.g. Pandas). An instance of this class can then accept a variety of strategies for deduplication and grouping.
Upon initialisation, DupeGrouper
sets a new column, usually "group_id"
— but you can control this by setting an environment variable GROUP_ID
at
runtime. The group_id is linearly increasing, numeric id column starting at
1 to the length of the dataframe provided.
260 @singledispatchmethod 261 def add_strategy(self, strategy: DeduplicationStrategy | tuple | strategy_map_collection): 262 """ 263 Add a strategy to the strategy manager. 264 265 Instances of `DeduplicationStrategy` or tuple are added to the 266 "default" key. Mapping objects update the manager directly 267 268 Args: 269 strategy: A deduplication strategy, tuple, or strategy collection 270 (mapping) to add. 271 272 Returns: 273 self is updated 274 275 Raises: 276 NotImplementedError 277 """ 278 return NotImplementedError(f"Unsupported strategy: {type(strategy)}")
Add a strategy to the strategy manager.
Instances of DeduplicationStrategy
or tuple are added to the
"default" key. Mapping objects update the manager directly
Arguments:
- strategy: A deduplication strategy, tuple, or strategy collection (mapping) to add.
Returns:
self is updated
Raises:
- NotImplementedError
291 def dedupe(self, attr: str | None = None): 292 """dedupe, and group, the data based on the provided attribute 293 294 Args: 295 attr: The attribute to deduplicate. If stratgies have been added as 296 a mapping object, this must not passed, as the keys of the 297 mapping object will be used instead 298 """ 299 self._dedupe(attr, self._strategy_manager.get())
dedupe, and group, the data based on the provided attribute
Arguments:
- attr: The attribute to deduplicate. If stratgies have been added as a mapping object, this must not passed, as the keys of the mapping object will be used instead
301 @property 302 def strategies(self) -> None | tuple[str, ...] | dict[str, tuple[str, ...]]: 303 """ 304 Returns the strategies currently stored in the strategy manager. 305 306 If no strategies are stored, returns `None`. Otherwise, returns a tuple 307 of strategy names or a dictionary mapping attributes to their 308 respective strategies. 309 310 Returns: 311 The stored strategies, formatted 312 """ 313 strategies = self._strategy_manager.get() 314 if not strategies: 315 return None 316 317 def parse_strategies(dict_values): 318 return tuple([(vx[0].__name__ if isinstance(vx, tuple) else vx.__class__.__name__) for vx in dict_values]) 319 320 if "default" in strategies: 321 return tuple([parse_strategies(v) for _, v in strategies.items()])[0] 322 return {k: parse_strategies(v) for k, v in strategies.items()}
Returns the strategies currently stored in the strategy manager.
If no strategies are stored, returns None
. Otherwise, returns a tuple
of strategy names or a dictionary mapping attributes to their
respective strategies.
Returns:
The stored strategies, formatted