# A Practical Guide To Entity Resolution in Python (No Database, No Machine Learning)

> Source: <https://dev.to/prithwish_nath/a-practical-guide-to-entity-resolution-in-python-no-database-no-machine-learning-3pnl>
> Published: 2026-05-26 07:36:44+00:00

**TL;DR:** Learn a very simple way to normalize, dedupe, and fuzzy-match records that refer to the same real-world entity in Python, without a database or any ML pipelines.

I was working on a Crunchbase dataset last Friday. I joined it against our CRM, and got 56 hits out of 96. The other 40 were sitting *right there* in both tables — `Necker FinTech`

in the extracted data was`Necker FinTech Holdings Inc.`

in the CRM; `Investing.com`

in the data was`Fusion Media Limited`

in the CRM — but `JOIN ... ON name = name`

obviously doesn't care, it will shrug and return nothing. If I'd shipped that, some sales rep would end up cold-pitching an existing customer because of it. 😅

[This is the core problem of entity resolution](https://en.wikipedia.org/wiki/Record_linkage): the same real-world entity wearing different names in different systems. Naive text equality checks are borderline useless in the real world. I’d been meaning to do something less embarrassing than a raw `==`

for a while, so I spent the rest of the weekend on a simple pipeline — scrape company names from Crunchbase hubs via [Bright Data](https://get.brightdata.com/bd7914?utm_content=a_practical_guide_to_entity_resolution_in_python_no_database_no_machine_learning), normalize, deduplicate, and fuzzy-match against the CRM list using [RapidFuzz](https://github.com/rapidfuzz/RapidFuzz) (`fuzz.WRatio`

). **Deliberately choosing to NOT use ML, vector embeddings, or a database.**

The join rate on this dataset jumped from **~58% to 100%**.

| Metric | Exact (normalized string) | Fuzzy (WRatio ≥ 90) |
|---|---|---|
| Scraped hub rows → CRM | 58.3% (56 / 96) |
100% (96 / 96) |
| CRM rows → scraped data | 34.8% (48 / 138) |
100% (138 / 138) |

The reason exact matching loses so badly is that *any* real CRM list you’re handed will almost always have multiple legal-name variants per company — I had three different *Necker* spellings pointing at one hub listing alone. Fuzzy matching earns its keep by collapsing those variants back into a single canonical cluster, and that’s most of what the rest of this post is about.

I’ll walk through it; I hope it’s useful for anyone starting with fuzzy algorithms!

**Entity resolution** matches records that describe the same company under different surface strings.

**If you use exact matching, you ask**: *are these two strings identical?* After you lowercase and strip punctuation, `"Necker FinTech"`

and `"Necker FinTech Holdings Inc."`

are still different strings — so a SQL `JOIN`

or a Python `==`

check will incorrectly say no match.

```
-- Exact join on raw names returns no row when spellings differ  
SELECT h.company_name AS hub_name, c.company_name AS crm_name  
FROM   hub_scrape h  
JOIN   crm_accounts c ON c.company_name = h.company_name  
WHERE  h.company_name = 'Necker FinTech';  
-- This will return 0 rows   
-- Remember, CRM has "Necker FinTech Holdings Inc.", not the Crunchbase title
```

**This is why you use Fuzzy matching.** That asks a looser question: *how similar are these two strings*? You get a score — usually 0 to 100 — instead of `true`

or `false`

. Names that are clearly the same company but spelled differently (`Necker FinTech`

vs `Necker FinTech Holdings Inc.`

) will score high, while unrelated names will score low. You pick a **threshold** (we use 90): if the score is at or above it, you treat the pair as a match; otherwise you don't.

``` php
from rapidfuzz import fuzz  
THRESHOLD = 90  
def is_match(a: str, b: str) -> bool:  
    return fuzz.WRatio(a, b) >= THRESHOLD  
pairs = [  
    ("Necker FinTech", "Necker FinTech Holdings Inc."),   # same company, legal suffix  
    ("PointsKash", "Points Kash"),                        # same company, spacing  
    ("Investing.com", "Fusion Media Limited"),            # brand vs legal entity  
    ("Stripe", "Climate Corp"),                           # different companies  
]  
for a, b in pairs:  
    score = fuzz.WRatio(a, b)  
    print(f"{score:5.1f}  match={score >= THRESHOLD!s:5}  {a!r}  vs  {b!r}")
```

This is the same scoring logic we’ll use for the rest of the tutorial, so `pip install rapidfuzz`

is all you need to follow along.

Running the demo pairs above with `fuzz.WRatio`

and **WRatio threshold 90** yields:

| Pair | WRatio | Match at ≥ 90? | Drift type |
|---|---|---|---|
`Necker FinTech` vs `Necker FinTech Holdings Inc.`
|
90.0 | Yes | Legal suffix |
`PointsKash` vs `Points Kash`
|
95.2 | Yes | Token spacing |
`Investing.com` vs `Fusion Media Limited`
|
30.0 | No | Brand vs legal entity |
`Stripe` vs `Climate Corp`
|
45.0 | No | Unrelated companies |

Think of it like a strict spell-check or a “did you mean X?” suggestion, but for whole company names. **It is not machine learning — no model is trained on your data.** The library compares characters and words using fixed rules: how many edits to turn one string into another, whether one name is contained in the other, whether the same words appear in a different order. That’s why it’s fast, easy to audit, and good enough for a large class of real-world messiness — extra words, `Inc.`

vs `LLC`

, odd spacing, punctuation.

💡If two names share almost no letters —Investing.comandFusion Media Limitedfor example — the score stays low and fuzzy matching correctly refuses to merge them. Those cases need a real identifier (domain, LEI, enrichment API, some sort of ML pipeline etc.), not smarter string math.

Here’s a quick summary.

| Approach | Best when | Used in this pipeline? |
|---|---|---|
Fuzzy matching (RapidFuzz WRatio) |
Same entity, stylistic drift — legal suffixes, spacing, punctuation | Yes — primary method |
Lookup table / enrichment API |
Brand vs legal name; names share almost no tokens | Partial — `RESEARCHED` dict in `build_sample_crm.py`
|
| (GLEIF, Clearbit, domain) | ||
ML record linkage (Dedupe, Splink) |
Large-scale probabilistic linkage, many fields beyond name | No — names-only, no training step |

Basically, choose fuzzy matching when two name strings likely describe the same company but spell it differently.

Only choose a lookup or enrichment layer when the strings are *related* entities (brand vs operator) rather than variants of one name.

**Entity resolution in this pipeline** is a fetch → extract → normalize → fuzzy-cluster → join loop on `canonical_id`

.

```
hub_urls.json  
      │  
      ▼  
fetch_hubs.py ──calls──► bright_data_unlocker.py     Bright Data POST → page body (markdown/HTML)  
      │                           │  
      └──calls──► parse_hubs.py ◄─┘                  regex → org slug + display name  
      │  
      ▼  
hub_snapshot.json                                    (+ cached bodies in data/hub_responses/)  

extract.py ──► raw_records.json                      flat table  

reconcile.py ──► reconciled.json                     canonical clusters + aliases  

run_fuzzy.py                                         CLI part. This just runs extract + reconcile   

── optional eval ──  
post_fuzzy_eval.py                                   All done, so run a real-world test, calc metrics, then print to stdout
```

**Each stage is a pure transform: JSON in, JSON out.** Nothing stateful, nothing that requires a running service, and nothing I can't `git diff`

between runs.

I’m scraping four Crunchbase hub leaderboard pages, defined in a `hub_urls.json`

:

```
[  
  { "category": "fintech",                "url": "https://www.crunchbase.com/hub/fintech-companies-seed-funding" },  
  { "category": "cybersecurity",          "url": "https://www.crunchbase.com/hub/cyber-security-startups" },  
  { "category": "saas",                   "url": "https://www.crunchbase.com/hub/saas-companies-seed-funding" },  
  { "category": "artificial_intelligence","url": "https://www.crunchbase.com/hub/artificial-intelligence-companies-early-stage-venture-funding" }  
]
```

Replace with your own, obviously.

Crunchbase is a JavaScript-heavy SPA — it won’t respond to a plain `requests.get`

. So before we fetch, I use [Bright Data's Web Unlocker](https://get.brightdata.com/bd-web-unlocker?utm_content=a_practical_guide_to_entity_resolution_in_python_no_database_no_machine_learning), which handles JS rendering and anti-bot for me.

[Sign up here -- Automated Web Unblocker](https://get.brightdata.com/bd-web-unlocker?utm_content=a_practical_guide_to_entity_resolution_in_python_no_database_no_machine_learning&source=post_page-----89d55badaeac---------------------------------------)

I set up a reusable client for this, and this is just a thin wrapper around their single `POST`

endpoint `https://api.brightdata.com/request`

. Make sure you’ve signed up, and have these set in your .env file first:

```
BRIGHTDATA_API_TOKEN=your_api_token  
BRIGHTDATA_ZONE=your_web_unlocker_zone_name
```

**bright_data_unlocker.py**

``` python
"""Fetch hub/listing pages as HTML or markdown."""
from __future__ import annotations

import json
import os
import time
from typing import Any, Dict, Literal, Optional

import requests
from dotenv import load_dotenv

load_dotenv()

ContentFormat = Literal["html", "markdown"]

class BrightDataUnlockerClient:
    """POST https://api.brightdata.com/request (Web Unlocker zone)."""

    def __init__(
        self,
        api_key: Optional[str] = None,
        zone: Optional[str] = None,
        country: Optional[str] = None,
    ):
        self.api_key = api_key or os.getenv("BRIGHT_DATA_API_KEY")
        self.zone = zone or os.getenv("BRIGHT_DATA_UNLOCKER_ZONE")
        self.country = country or os.getenv("BRIGHT_DATA_COUNTRY") # optional
        self.api_endpoint = "https://api.brightdata.com/request"

        if not self.api_key:
            raise ValueError("BRIGHT_DATA_API_KEY is required.")
        if not self.zone:
            raise ValueError(
                "BRIGHT_DATA_UNLOCKER_ZONE is required. "
                "Create a Web Unlocker API zone in Bright Data."
            )

        self.session = requests.Session()
        self.session.headers.update(
            {
                "Content-Type": "application/json",
                "Authorization": f"Bearer {self.api_key}",
            }
        )

    def fetch(
        self,
        url: str,
        *,
        content_format: ContentFormat = "markdown",
        max_retries: int = 2,
    ) -> str:
        """Fetch page body. markdown => format=raw + data_format=markdown (Bright Data)."""
        last_err: Optional[Exception] = None
        for attempt in range(max_retries + 1):
            try:
                return self._do_fetch(url, content_format=content_format)
            except Exception as e:
                last_err = e
                if attempt < max_retries:
                    time.sleep(0.5 * (attempt + 1))
        assert last_err is not None
        raise last_err

    def fetch_markdown(self, url: str, max_retries: int = 2) -> str:
        return self.fetch(url, content_format="markdown", max_retries=max_retries)

    def fetch_html(self, url: str, max_retries: int = 2) -> str:
        return self.fetch(url, content_format="html", max_retries=max_retries)

    def _do_fetch(self, url: str, *, content_format: ContentFormat) -> str:
        payload: Dict[str, Any] = {
            "zone": self.zone,
            "url": url,
            "format": "raw",
        }
        if content_format == "markdown":
            payload["data_format"] = "markdown"
        if self.country:
            payload["country"] = self.country

        response = self.session.post(self.api_endpoint, json=payload, timeout=120)
        response.raise_for_status()

        try:
            result = response.json()
        except json.JSONDecodeError:
            # data_format=markdown often returns the page body directly, not a JSON envelope
            text = response.text
            if not text.strip():
                raise RuntimeError("Bright Data Unlocker empty response body")
            return text

        if not isinstance(result, dict):
            raise RuntimeError(f"Bright Data unexpected response type: {type(result)}")

        inner_status = result.get("status_code")
        if inner_status is not None and inner_status != 200:
            raise RuntimeError(f"Bright Data Unlocker status_code={inner_status}")

        body = result.get("body")
        if body is None:
            if "status_code" in result and result.get("status_code") == 200:
                raise RuntimeError("Bright Data Unlocker empty body")
            raise RuntimeError(f"Bright Data Unlocker missing body: {list(result.keys())}")

        if isinstance(body, str):
            if body.strip().startswith("{"):
                try:
                    nested = json.loads(body)
                    if isinstance(nested, dict) and "body" in nested:
                        body = nested["body"]
                except json.JSONDecodeError:
                    pass
            if not str(body).strip():
                raise RuntimeError("Bright Data Unlocker empty body string")
            return str(body)
        if isinstance(body, dict):
            return json.dumps(body)
        return str(body)
```

Note how we can request`data_format=markdown`

. Using this param, Bright Data returns a sanitized markdown rendering of the page, which is *much* easier to parse with regex than raw HTML.

💡 If markdown still yields zero orgs for a hub, fetch_hubs.py --fallback-html can fetch or use cached HTML and run the HTML parser instead.

With that in place, here’s our actual fetch script — `fetch_hubs.py`

**fetch_hubs.py**

```
"""Fetch Crunchbase hub pages via Bright Data Web Unlocker; write hub_snapshot.json."""

from __future__ import annotations

import argparse
import json
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional

from dotenv import load_dotenv

from bright_data_unlocker import BrightDataUnlockerClient, ContentFormat
from parse_hubs import parse_organizations

load_dotenv()

_ROOT = Path(__file__).resolve().parent
_DEFAULT_RESPONSES_DIR = _ROOT / "data" / "hub_responses"

def load_hub_urls(path: Path) -> List[Dict[str, str]]:
    raw = json.loads(path.read_text(encoding="utf-8"))
    if not isinstance(raw, list):
        raise ValueError("hub_urls.json must be a JSON array")
    out: List[Dict[str, str]] = []
    for item in raw:
        if not isinstance(item, dict):
            continue
        url = (item.get("url") or "").strip()
        category = (item.get("category") or "unknown").strip()
        if url:
            out.append({"category": category, "url": url})
    return out

def _response_file(category: str, content_format: ContentFormat) -> str:
    ext = "md" if content_format == "markdown" else "html"
    safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in category)
    return f"{safe}.{ext}"

def _response_path(
    responses_dir: Path, category: str, content_format: ContentFormat
) -> Path:
    return responses_dir / _response_file(category, content_format)

def load_cached_body(
    responses_dir: Path, category: str, content_format: ContentFormat
) -> Optional[str]:
    path = _response_path(responses_dir, category, content_format)
    if not path.is_file() or path.stat().st_size == 0:
        return None
    return path.read_text(encoding="utf-8")

def save_response_body(
    responses_dir: Path,
    category: str,
    hub_url: str,
    content_format: ContentFormat,
    body: str,
) -> Path:
    responses_dir.mkdir(parents=True, exist_ok=True)
    path = _response_path(responses_dir, category, content_format)
    path.write_text(body, encoding="utf-8")
    return path

def _manifest_path(responses_dir: Path) -> Path:
    return responses_dir / "manifest.json"

def _load_manifest(responses_dir: Path) -> Dict[str, Any]:
    path = _manifest_path(responses_dir)
    if not path.is_file():
        return {"hubs": []}
    return json.loads(path.read_text(encoding="utf-8"))

def _upsert_manifest_entry(
    responses_dir: Path,
    category: str,
    hub_url: str,
    content_format: ContentFormat,
    response_path: Path,
    *,
    fetched_at: str,
) -> None:
    entry = {
        "category": category,
        "hub_url": hub_url,
        "content_format": content_format,
        "response_file": response_path.name,
        "fetched_at": fetched_at,
    }
    manifest = _load_manifest(responses_dir)
    hubs = [h for h in manifest.get("hubs") or [] if h.get("category") != category]
    hubs.append(entry)
    manifest["hubs"] = hubs
    manifest["updated_at"] = datetime.now(timezone.utc).isoformat()
    _manifest_path(responses_dir).write_text(
        json.dumps(manifest, indent=2, ensure_ascii=False) + "\n",
        encoding="utf-8",
    )

def _parse_body(
    body: str,
    hub_url: str,
    content_format: ContentFormat,
    max_orgs: int,
) -> List[Dict[str, Any]]:
    return parse_organizations(body, hub_url, content_format=content_format, max_orgs=max_orgs)

def main() -> None:
    ap = argparse.ArgumentParser(
        description="Fetch Crunchbase hub pages (Web Unlocker) and extract organization URLs.",
    )
    ap.add_argument("--hubs-json", type=Path, default=_ROOT / "hub_urls.json")
    ap.add_argument("--out", type=Path, default=_ROOT / "data" / "hub_snapshot.json")
    ap.add_argument(
        "--format",
        choices=("markdown", "html"),
        default="markdown",
    )
    ap.add_argument("--max-orgs-per-hub", type=int, default=80)
    ap.add_argument("--delay", type=float, default=1.0)
    ap.add_argument(
        "--responses-dir",
        type=Path,
        default=_DEFAULT_RESPONSES_DIR,
        help="Directory for cached raw hub page bodies (default: data/hub_responses).",
    )
    ap.add_argument(
        "--refetch",
        action="store_true",
        help="Call Bright Data even if a cached response file exists.",
    )
    ap.add_argument(
        "--parse-only",
        action="store_true",
        help="Parse cached responses only; never call Bright Data.",
    )
    ap.add_argument(
        "--fallback-html",
        action="store_true",
        help="If markdown parse finds 0 orgs, try cached or fetched HTML.",
    )
    args = ap.parse_args()

    responses_dir = args.responses_dir

    hubs = load_hub_urls(args.hubs_json)
    if not hubs:
        raise SystemExit("No hubs in hub_urls.json")

    args.out.parent.mkdir(parents=True, exist_ok=True)
    client: Optional[BrightDataUnlockerClient] = None
    if not args.parse_only:
        client = BrightDataUnlockerClient()

    content_format: ContentFormat = args.format

    payload: Dict[str, Any] = {
        "fetched_at": datetime.now(timezone.utc).isoformat(),
        "source": "bright_data_web_unlocker",
        "content_format": content_format,
        "responses_dir": str(responses_dir),
        "hubs": [],
    }

    n_hubs = len(hubs)
    for i, hub in enumerate(hubs, start=1):
        category = hub["category"]
        url = hub["url"]
        print(f"\n[{i}/{n_hubs}] hub [{category}]: starting...", flush=True)
        block: Dict[str, Any] = {
            "category": category,
            "hub_url": url,
            "error": None,
            "organic_count": 0,
            "rows": [],
            "response_file": _response_file(category, content_format),
        }
        parse_format: ContentFormat = content_format

        try:
            body: Optional[str] = None
            if not args.refetch:
                body = load_cached_body(responses_dir, category, content_format)

            if body is None:
                if args.parse_only:
                    raise FileNotFoundError(
                        f"no cached response at {_response_path(responses_dir, category, content_format)} "
                        "(run without --parse-only to fetch)"
                    )
                print(
                    f"[{i}/{n_hubs}] hub [{category}]: fetching ({content_format})...",
                    flush=True,
                )
                assert client is not None
                body = client.fetch(url, content_format=content_format)
                print(
                    f"[{i}/{n_hubs}] hub [{category}]: fetch done "
                    f"({len(body):,} chars)",
                    flush=True,
                )
                saved = save_response_body(
                    responses_dir, category, url, content_format, body
                )
                _upsert_manifest_entry(
                    responses_dir,
                    category,
                    url,
                    content_format,
                    saved,
                    fetched_at=datetime.now(timezone.utc).isoformat(),
                )
                print(f"[{i}/{n_hubs}] hub [{category}]: saved {saved}", flush=True)
            else:
                print(
                    f"[{i}/{n_hubs}] hub [{category}]: using cache "
                    f"{_response_path(responses_dir, category, content_format)}",
                    flush=True,
                )

            print(f"[{i}/{n_hubs}] hub [{category}]: parsing...", flush=True)
            rows = _parse_body(body, url, parse_format, args.max_orgs_per_hub)

            if not rows and args.fallback_html and parse_format == "markdown":
                html_body = load_cached_body(responses_dir, category, "html")
                if html_body is None and not args.parse_only:
                    print(
                        f"[{i}/{n_hubs}] hub [{category}]: markdown had 0 orgs, "
                        "fetching HTML...",
                        flush=True,
                    )
                    assert client is not None
                    html_body = client.fetch(url, content_format="html")
                    print(
                        f"[{i}/{n_hubs}] hub [{category}]: HTML fetch done "
                        f"({len(html_body):,} chars)",
                        flush=True,
                    )
                    saved = save_response_body(
                        responses_dir, category, url, "html", html_body
                    )
                    print(f"[{i}/{n_hubs}] hub [{category}]: saved {saved}", flush=True)
                elif html_body is None:
                    raise FileNotFoundError(
                        f"no cached HTML at {_response_path(responses_dir, category, 'html')}"
                    )
                else:
                    print(
                        f"[{i}/{n_hubs}] hub [{category}]: markdown had 0 orgs, "
                        "using cached HTML...",
                        flush=True,
                    )
                print(f"[{i}/{n_hubs}] hub [{category}]: parsing HTML...", flush=True)
                rows = _parse_body(html_body, url, "html", args.max_orgs_per_hub)
                parse_format = "html"
                block["response_file"] = _response_file(category, "html")

            block["content_format"] = parse_format
            block["organic_count"] = len(rows)
            block["rows"] = rows
            print(
                f"[{i}/{n_hubs}] hub [{category}]: done - "
                f"{len(rows)} organizations",
                flush=True,
            )

        except Exception as e:
            print(f"[{i}/{n_hubs}] hub [{category}]: failed - {e}", flush=True)
            block["error"] = str(e)

        payload["hubs"].append(block)
        if not args.parse_only:
            time.sleep(args.delay)

    args.out.write_text(
        json.dumps(payload, indent=2, ensure_ascii=False) + "\n",
        encoding="utf-8",
    )
    total = sum(h.get("organic_count") or 0 for h in payload["hubs"])
    print(
        f"\nAll hubs processed. Wrote {args.out} "
        f"({total} organizations across {n_hubs} hubs).",
        flush=True,
    )

if __name__ == "__main__":
    main()
```

Note how I’m caching the raw bodies under `data/hub_responses/`

so re-runs with `--parse-only`

don't burn any API credits.

Our `parse_hubs.py`

pulls organization slugs and display names out of the cached page bodies from the previous step. It runs three regex patterns in priority order:

```
# parse_hubs.py  

# Priority 1: Bright Data relative markdown links
# Matches: ](/organization/slug "Display Name")
_ORG_REL_LINK = re.compile(
    r"\]\(/organization/([a-z0-9_-]+)(?:\s+\"([^\"]*)\")?\s*\)",
    re.I,
)

# Priority 2: Standard absolute markdown links
# Matches: [Company Name](https://www.crunchbase.com/organization/slug)
_ORG_MD_LINK = re.compile(
    r"\[([^\]]+)\]\(\s*<?https?://[^>\s)]*crunchbase.com/organization/([a-z0-9_-]+)/?>?\s*\)",
    re.I,
)

# Fallback: bare /organization/slug anywhere in text
_ORG_IN_TEXT = re.compile(
    r"(?:https?://[^/\s]*crunchbase.com)?/organization/([a-z0-9_-]+)",
    re.I,
)
```

Each hub gets parsed into rows like:

```
{ "url": "https://www.crunchbase.com/organization/lovable", "slug": "lovable", "title": "Lovable" }
```

Here’s the full code for `parse_hubs.py`

. Note that I also keep a blocklist of well-known VCs and accelerators (`y-combinator`

, `techstars`

, `andreessen-horowitz`

, etc.) that show up on hub pages but are the *investors*, not the companies being listed. Without this, you get YC ranked #1 on every hub it's ever touched, which is obviously not what we want.

**parse_hubs.py**

```
"""Parse Crunchbase hub pages (markdown or HTML) for /organization/ links."""

from __future__ import annotations

import re
from typing import Any, Dict, List, Literal, Set
from urllib.parse import urljoin, urlparse

ContentFormat = Literal["html", "markdown"]

_ORG_IN_TEXT = re.compile(
    r"(?:https?://[^/\s]*crunchbase\.com)?/organization/([a-z0-9_-]+)",
    re.I,
)
# [Company Name](https://www.crunchbase.com/organization/slug)
_ORG_MD_LINK = re.compile(
    r"\[([^\]]+)\]\(\s*<?https?://[^>\s)]*crunchbase\.com/organization/([a-z0-9_-]+)/?>?\s*\)",
    re.I,
)
# Bright Data markdown: multi-line link ending with ](/organization/slug "Display Name")
_ORG_REL_LINK = re.compile(
    r"\]\(/organization/([a-z0-9_-]+)(?:\s+\"([^\"]*)\")?\s*\)",
    re.I,
)
_ORG_BLOCKLIST = frozenset(
    {
        "y-combinator",
        "techstars",
        "national-science-foundation",
        "masschallenge",
        "easme",
        "andreessen-horowitz",
        "sequoia-capital",
        "accel",
    }
)

def slug_to_display_name(slug: str) -> str:
    return slug.replace("-", " ").title()

def _append_org(
    rows: List[Dict[str, Any]],
    seen_slugs: Set[str],
    *,
    slug: str,
    title: str,
    hub_url: str,
    max_orgs: int,
) -> None:
    if len(rows) >= max_orgs:
        return
    slug = slug.lower()
    if slug in _ORG_BLOCKLIST or slug in seen_slugs:
        return
    seen_slugs.add(slug)
    base = f"{urlparse(hub_url).scheme}://{urlparse(hub_url).netloc}"
    name = (title or "").strip() or slug_to_display_name(slug)
    rows.append(
        {
            "url": urljoin(base, f"/organization/{slug}"),
            "slug": slug,
            "title": name,
        }
    )

def parse_organizations_from_markdown(
    markdown: str,
    hub_url: str,
    *,
    max_orgs: int = 80,
) -> List[Dict[str, Any]]:
    """Extract orgs from markdown links; fall back to bare organization URLs."""
    seen_slugs: Set[str] = set()
    rows: List[Dict[str, Any]] = []

    for match in _ORG_REL_LINK.finditer(markdown):
        slug = match.group(1)
        title = (match.group(2) or "").strip()
        _append_org(rows, seen_slugs, slug=slug, title=title, hub_url=hub_url, max_orgs=max_orgs)
        if len(rows) >= max_orgs:
            return rows

    for match in _ORG_MD_LINK.finditer(markdown):
        title, slug = match.group(1).strip(), match.group(2)
        _append_org(rows, seen_slugs, slug=slug, title=title, hub_url=hub_url, max_orgs=max_orgs)
        if len(rows) >= max_orgs:
            return rows

    if rows:
        return rows

    for match in _ORG_IN_TEXT.finditer(markdown):
        _append_org(
            rows,
            seen_slugs,
            slug=match.group(1),
            title="",
            hub_url=hub_url,
            max_orgs=max_orgs,
        )
        if len(rows) >= max_orgs:
            break
    return rows

def parse_organizations_from_html(
    html: str,
    hub_url: str,
    *,
    max_orgs: int = 80,
) -> List[Dict[str, Any]]:
    """Extract unique organization rows from hub page HTML."""
    seen_slugs: Set[str] = set()
    rows: List[Dict[str, Any]] = []

    for match in _ORG_IN_TEXT.finditer(html):
        _append_org(
            rows,
            seen_slugs,
            slug=match.group(1),
            title="",
            hub_url=hub_url,
            max_orgs=max_orgs,
        )
        if len(rows) >= max_orgs:
            break
    return rows

def parse_organizations(
    body: str,
    hub_url: str,
    *,
    content_format: ContentFormat = "markdown",
    max_orgs: int = 80,
) -> List[Dict[str, Any]]:
    if content_format == "markdown":
        return parse_organizations_from_markdown(body, hub_url, max_orgs=max_orgs)
    return parse_organizations_from_html(body, hub_url, max_orgs=max_orgs)
```

**First-run gotcha I hit was a classic.** My original parser expected absolute URLs (`https://www.crunchbase.com/organization/...`

), but Bright Data's markdown renderer produces *relative* links (`/organization/slug "Display Name"`

) 🙃. So zero companies extracted on the first run — *simply because the regex didn't match*.

So I just added`_ORG_REL_LINK`

to the parser and re-ran Stage 1 with `--parse-only`

, fixing it at no additional API cost. **This is why we cached our raw response bodies.** Your parser will probably need trial-and-erroring more than once, and you don’t want to actually re-fetch the data for that.

**Output of this stage:** A `hub_snapshot.json`

— 96 organizations across 4 hubs (Fintech produced 26, Cybersecurity: 24, SaaS: 22, AI: 24). Note that these are *hub leaderboard* entries, not full Crunchbase exports.

Because the full Crunchbase lists run to *thousands*; I'm taking the curated top slice on purpose, because the cleaner my source is, the more clearly the fuzzy lift shows up against it.

Before clustering, I flatten the nested snapshot into one uniform record per company appearance. `extract.py`

handles this:

```
"""From hub_snapshot.json to raw_records.json with company_name per organization."""    

from __future__ import annotations    

import json    
from datetime import datetime, timezone    
from pathlib import Path    
from typing import Any, Dict, List    

def records_from_hub_snapshot(data: Dict[str, Any]) -> List[Dict[str, Any]]:    
    records: List[Dict[str, Any]] = []    
    for hi, block in enumerate(data.get("hubs") or []):    
        if block.get("error"):    
            continue    
        category = (block.get("category") or "unknown").strip()    
        hub_url = block.get("hub_url") or ""    
        for ri, row in enumerate(block.get("rows") or []):    
            if not isinstance(row, dict):    
                continue    
            url = (row.get("url") or "").strip()    
            if not url or "/organization/" not in url.lower():    
                continue    
            title = (row.get("title") or "").strip()    
            slug = (row.get("slug") or "").strip()    
            company_name = title or (slug.replace("-", " ").title() if slug else "")    
            if not company_name:    
                continue    
            records.append(    
                {    
                    "id": f"hub:{hi}:{ri}",    
                    "source": "crunchbase_hub",    
                    "category": category,    
                    "company_name": company_name,    
                    "raw_name": title or company_name,    
                    "url": url,    
                    "domain": "www.crunchbase.com",    
                    "hub_url": hub_url,    
                    "position": ri + 1,    
                }    
            )    
    return records    

def build_raw_payload(snapshot_path: Path) -> Dict[str, Any]:    
    raw = json.loads(snapshot_path.read_text(encoding="utf-8"))    
    if not isinstance(raw.get("hubs"), list):    
        raise ValueError(f"{snapshot_path}: expected hub snapshot with 'hubs' array")    
    records = records_from_hub_snapshot(raw)    
    return {    
        "extracted_at": datetime.now(timezone.utc).isoformat(),    
        "snapshot": str(snapshot_path.name),    
        "record_count": len(records),    
        "records": records,    
    }    

def write_raw_records(snapshot_path: Path, out_path: Path) -> Dict[str, Any]:    
    payload = build_raw_payload(snapshot_path)    
    out_path.parent.mkdir(parents=True, exist_ok=True)    
    out_path.write_text(    
        json.dumps(payload, indent=2, ensure_ascii=False) + "n",    
        encoding="utf-8",    
    )    
    return payload
```

The `id`

field (`hub:0:3`

, `hub:2:11`

, etc.) is our stable key that links each raw record to its canonical cluster in Stage 4. Deterministic, derivable from position, and most importantly, easy to debug.

**Output:** `raw_records.json`

— 96 rows, all `source: "crunchbase_hub"`

fields, tagged by category.

**Entity resolution reconciliation** (Stage 4) collapses duplicate company names into canonical clusters. In this dataset, 96 scraped rows become **88 canonical companies** after normalization and fuzzy clustering. Four names show up on more than one hub — **Callaghan Innovation** and **EISMEA** on all four leaderboards, **PayTic** and **SixThirty** on two — which gives duplicate rows before clustering.

After exact normalization there are **88** distinct normalized names, which happens to be the same count as final clusters at WRatio threshold 90 — meaning *no additional fuzzy merges were needed beyond collapsing the cross-hub duplicates*.

I run reconciliation in two passes.

See full code here for

reconcile.py:[https://gist.github.com/sixthextinction/5c711e48353f4f7765e13cc4bb1b25de]

```
# reconcile.py  
_LEGAL     = re.compile(  
    r"b(inc.?|llc.?|ltd.?|plc.?|corp.?|corporation|co.?|company|limited)b",  
    re.I,  
)  
_NON_ALNUM = re.compile(r"[^ws]", re.UNICODE)  

def normalize_company_name(s: str) -> str:  
    s = s.lower().strip()  
    s = _NON_ALNUM.sub(" ", s)   # strip punctuation  
    s = _LEGAL.sub(" ", s)       # drop legal suffixes  
    s = re.sub(r"s+", " ", s).strip()  
    return s
```

After normalization, I group records by their normalized string. `"Lovable"`

, `"lovable"`

, and `"Lovable."`

all collapse into the same group. This removes trivial duplicates *before* the more expensive fuzzy-matching pass. **TL;DR: Do the cheap pass first, expensive pass second** — same reason you’d put a `WHERE`

clause *before* a `JOIN`

.

For this dataset that’s **96 hash inserts** — one `normalize_company_name()`

+ one dict lookup per row — roughly **~O(n)**.

The important optimization is that normalization shrinks the search space *before* the quadratic fuzzy pass runs. Without Pass 1, naïve all-pairs fuzzy matching over `n = 10,000`

unique names would require:

n(n−1)/2 ≈ 50 million comparisons

It takes my laptop ~**1.3 µs** per RapidFuzz `WRatio`

call on ~30-character names, so that pushes our runtime toward ~60 seconds instead of milliseconds. Not ideal — which is exactly why Pass 1 exists, to reduce `n`

before the O(n²) step becomes expensive.

I then compare each exact group against existing clusters using **WRatio** from RapidFuzz.

``` python
# reconcile.py  
from rapidfuzz import fuzz  
_FUZZY_SCORER = fuzz.WRatio  

def _fuzzy_merge_groups(    groups: List[List[Dict[str, Any]]],  
    threshold: float,           # default: 90.0) -> List[Cluster]:  
    clusters: List[Cluster] = []  
    for group in sorted(groups, key=lambda g: (  
        min(_source_rank(m.get("source") or "") for m in g),  
        -len(g),  
    )):  
        rep = pick_canonical_name(group)  
        placed = False  
        for cluster in clusters:  
            if _FUZZY_SCORER(rep, cluster.canonical_name) >= threshold:  
                cluster.members.extend(group)  
                cluster.canonical_name = pick_canonical_name(cluster.members)  
                cluster.canonical_id   = make_canonical_id(cluster.canonical_name)  
                placed = True  
                break  
        if not placed:  
            clusters.append(Cluster(  
                canonical_id=make_canonical_id(rep),  
                canonical_name=rep,  
                members=list(group),  
            ))  
    return clusters
```

Here, we have to compare each group’s representative against existing cluster canonicals. So the worst case with `g = 88`

exact groups would be

0 + 1 + 2 + … + 87 = 3,828 comparisons

That’s roughly ~O(g²).

RapidFuzz ships several scorers — see the [rapidfuzz.fuzz docs](https://rapidfuzz.github.io/RapidFuzz/Usage/fuzz.html) for the full list. We use [fuzz.WRatio](https://rapidfuzz.github.io/RapidFuzz/Usage/fuzz.html#wratio) (weighted ratio; same algorithm family as [FuzzyWuzzy’s WRatio](https://github.com/seatgeek/fuzzywuzzy)) because company names drift in different ways and no single metric covers all of them.

[WRatio](https://rapidfuzz.github.io/RapidFuzz/Usage/fuzz.html#wratio) is a **meta-scorer**: for each pair of strings it runs several ratio algorithms internally (with length-based weighting) and returns the best score. It combines:

`Necker FinTech`

vs `Necker FinTech Holdings Inc.`

looks like a poor match).`Inc.`

or `Group`

.You rarely know in advance *which* kind of drift a CRM row will have — suffix appended, spacing changed, words reordered. **WRatio picks the strategy that scores highest for that specific pair**, which is exactly what you want for entity resolution on names alone.

We default to **threshold 90**: strict enough that unrelated pairs (`Stripe`

vs `Climate Corp`

) stay out, loose enough that real variants (`PointsKash`

vs `Points Kash`

) merge. Tune it on your data.

On this dataset specifically, WRatio handles the drift patterns we actually see in company names (or historically have, anyway):

| Hub / scraped name | CRM variant (in `sample_crm.json` ) |
Drift type |
|---|---|---|
`Necker FinTech` |
`Necker FinTech Holdings Inc.` |
Legal suffix + spacing (`Fin Tech` vs `FinTech` ) |
`PANTA` |
`PANTA Group` |
Type descriptor appended |
`Physical Intelligence` |
`Physical Intelligence (Pi), Inc.` |
Parenthetical + legal suffix |
`PointsKash` |
`Points Kash` |
Token spacing |
`qBotica` |
`q Botica` |
Token spacing |

Pure `ratio`

(edit distance) would heavily penalize `Necker FinTech`

vs `Necker FinTech Holdings Inc.`

because three extra words add significant distance. So`partial_ratio`

handles containment and`token_set_ratio`

handles reordering. **WRatio picks the strategy that produces the best score for each specific pair** — which is exactly the behavior you want when you don't know in advance *how* a name is going to drift.

Display names with **no shared tokens** to the legal entity — e.g. hub title `Investing.com`

vs operator `Fusion Media Limited`

, or `Lyrie.ai`

vs `OTT Cybersecurity Inc.`

— stay below threshold. WRatio correctly refuses to merge them. That’s a good thing — those belong in a lookup table or enrichment API, not in a string-similarity pass (see Caveats).

One last thing before we move on to the demo — every time a cluster gains new members, I re-evaluate its canonical name. The source ranking (`crunchbase_hub`

= 0, anything else = 99) ensures that short, clean display names win over longer legal variants:

``` php
def pick_canonical_name(members: Sequence[Dict[str, Any]]) -> str:  
    def sort_key(m):  
        name = (m.get("company_name") or "").strip()  
        return (_source_rank(m.get("source") or ""), len(name), name.lower())  
    return min(members, key=sort_key)["company_name"].strip()
```

A Crunchbase display name like `"Lovable"`

will always beat `"Lovable Technologies Inc."`

as the canonical — it's from a trusted source *and* it's shorter. The legal variant ends up as an alias, which is exactly the right relationship.

**Output of this stage:** `reconciled.json`

— 88 canonical clusters, alias mappings with WRatio scores, and CRM join metrics.

That’s it, we’re all done with the fuzzy pipeline. Let’s see if that improved things.

Our `sample_crm.json`

simulates the data you’d get from a real CRM — I simply researched legal names and known alternate spellings online for the companies I had, and put it in a JSON file. **This gave me 138 rows representing the same 88 canonical companies.**

Some companies had one exact-match entry — these are easy for us to handle. Others had three or four variants that I’d name like this:

```
{ "id": "crm:necker_fintech_0", "company_name": "Necker Fin Tech" },  
{ "id": "crm:necker_fintech_1", "company_name": "Necker FinTech Group" },  
{ "id": "crm:necker_fintech_2", "company_name": "Necker FinTech Holdings Inc." }
```

Our join logic in the`post_fuzzy_eval.py`

demo runs exact normalization first, then falls back to fuzzy — note how this is the same “cheap pass first” pattern as the cluster builder:

**post_fuzzy_eval.py**

```
"""Optional CRM join evaluation — exact vs fuzzy match rates (not part of core reconcile)."""

from __future__ import annotations

import json
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence

from rapidfuzz import fuzz

from reconcile import (
    Cluster,
    DEFAULT_THRESHOLD,
    _exact_groups,
    normalize_company_name,
)

_FUZZY_SCORER = fuzz.WRatio

def load_crm(path: Path) -> List[Dict[str, Any]]:
    raw = json.loads(path.read_text(encoding="utf-8"))
    if isinstance(raw, list):
        rows = raw
    elif isinstance(raw, dict) and "companies" in raw:
        rows = raw["companies"]
    else:
        raise ValueError(f"{path}: expected list or {{'companies': [...]}}")
    out: List[Dict[str, Any]] = []
    for i, row in enumerate(rows):
        if not isinstance(row, dict):
            continue
        name = (row.get("company_name") or "").strip()
        if not name:
            continue
        out.append(
            {
                "id": row.get("id") or f"crm:{i}",
                "company_name": name,
            }
        )
    return out

def _record_to_cluster_map(clusters: Sequence[Cluster]) -> Dict[str, str]:
    out: Dict[str, str] = {}
    for cluster in clusters:
        for m in cluster.members:
            out[m["id"]] = cluster.canonical_id
    return out

def crm_to_canonical(
    crm_rows: Sequence[Dict[str, Any]],
    clusters: Sequence[Cluster],
    threshold: float,
) -> Dict[str, Optional[str]]:
    out: Dict[str, Optional[str]] = {}
    for row in crm_rows:
        key = str(row.get("id") or row.get("company_name"))
        name = (row.get("company_name") or "").strip()
        if not name:
            out[key] = None
            continue
        norm = normalize_company_name(name)
        matched: Optional[str] = None
        for cluster in clusters:
            if any(
                normalize_company_name(m.get("company_name") or "") == norm
                for m in cluster.members
            ):
                matched = cluster.canonical_id
                break
        if not matched:
            best_score = 0.0
            best_id: Optional[str] = None
            for cluster in clusters:
                score = _FUZZY_SCORER(name, cluster.canonical_name)
                if score > best_score:
                    best_score = score
                    best_id = cluster.canonical_id
            matched = best_id if best_score >= threshold else None
        out[key] = matched
    return out

def join_metrics(
    records: Sequence[Dict[str, Any]],
    crm_rows: Sequence[Dict[str, Any]],
    clusters: Sequence[Cluster],
    threshold: float,
) -> Dict[str, Any]:
    record_to_cid = _record_to_cluster_map(clusters)
    crm_to_cid = crm_to_canonical(crm_rows, clusters, threshold)

    crm_norms = {
        normalize_company_name((r.get("company_name") or ""))
        for r in crm_rows
        if normalize_company_name(r.get("company_name") or "")
    }
    crm_mapped_cids = {v for v in crm_to_cid.values() if v}

    scraped_exact = 0
    scraped_fuzzy = 0
    for r in records:
        norm = normalize_company_name(r.get("company_name") or "")
        if norm in crm_norms:
            scraped_exact += 1
        cid = record_to_cid.get(r["id"])
        if cid and cid in crm_mapped_cids:
            scraped_fuzzy += 1

    crm_exact = 0
    crm_fuzzy = 0
    scraped_norms = {
        normalize_company_name(r.get("company_name") or "") for r in records
    }
    scraped_cids = set(record_to_cid.values())
    for row in crm_rows:
        norm = normalize_company_name(row.get("company_name") or "")
        if norm in scraped_norms:
            crm_exact += 1
        cid_key = str(row.get("id") or row.get("company_name"))
        cid = crm_to_cid.get(cid_key)
        if cid and cid in scraped_cids:
            crm_fuzzy += 1

    n_scraped = len(records) or 1
    n_crm = len(crm_rows) or 1
    return {
        "scraped_rows": len(records),
        "crm_rows": len(crm_rows),
        "canonical_clusters": len(clusters),
        "exact_normalized_unique": len(_exact_groups(records)),
        "scraped_exact_join_pct": round(100.0 * scraped_exact / n_scraped, 1),
        "scraped_fuzzy_join_pct": round(100.0 * scraped_fuzzy / n_scraped, 1),
        "crm_exact_join_pct": round(100.0 * crm_exact / n_crm, 1),
        "crm_fuzzy_join_pct": round(100.0 * crm_fuzzy / n_crm, 1),
    }

def eval_crm_join(
    records: Sequence[Dict[str, Any]],
    clusters: Sequence[Cluster],
    crm_path: Path,
    threshold: float = DEFAULT_THRESHOLD,
) -> Dict[str, Any]:
    """Load CRM file and compute join metrics against existing clusters."""
    crm_rows = load_crm(crm_path)
    return join_metrics(records, crm_rows, clusters, threshold)
```

**Here’s how we measure this JOIN operation** (`join_metrics`

in `post_fuzzy_eval.py`

):

`company_name`

appears in the set of normalized CRM names.So how did we do?

| Question | Exact match | Fuzzy (WRatio ≥ 90) |
|---|---|---|
Of 96 scraped rows, how many link to a CRM row? |
58.3% (56 rows) |
100% (96 rows) |
Of 138 CRM rows, how many link back to scraped data? |
34.8% (48 rows) |
100% (138 rows) |

The 58.3% exact baseline isn’t actually bad — over half of raw hub titles normalize to a CRM string exactly. The other 41.7% however, absolutely need fuzzy matching via WRatio because the CRM holds legal or alternate spellings (`Necker FinTech Holdings Inc.`

vs hub `Necker FinTech`

, etc.) that no amount of lowercasing or other normalization will save you from.

The fuzzy pass closes the gap on this dataset at WRatio threshold 90. **WRatio is strict enough to avoid merging unrelated names while still picking up suffix and token drift** — which is fantastic — just what we want!

Commands below assume Python 3.10+ and a venv. All of this runs locally; the only network calls are to Bright Data during the initial fetch.

```
# Install deps  
pip install rapidfuzz requests python-dotenv  

# Fetch all 4 hubs (costs API credits)  
python fetch_hubs.py  

# Already have cached responses? Re-parse for free  
python fetch_hubs.py --parse-only  

# Extract + reconcile + print CRM metrics (default: both stages)  
python run_fuzzy.py  

# Regenerate sample_crm.json from raw_records (optional)  
python build_sample_crm.py  

# Tune the threshold (try 85 for more aggressive merging)  
python run_fuzzy.py --threshold 85  

# Run individual stages  
python run_fuzzy.py --extract  
python run_fuzzy.py --reconcile
```

Sample CLI output after a full run:

```
wrote data/raw_records.json  
  records: 96  
  category artificial_intelligence: 24  
  category cybersecurity: 24  
  category fintech: 26  
  category saas: 22  

wrote data/reconciled.json  

-- join metrics (CRM) --  
  scraped rows: 96 | exact-normalized unique: 88 | canonical clusters: 88  
  scraped -> CRM  exact: 58.3% | fuzzy: 100.0%  
  CRM -> scraped   exact: 34.8% | fuzzy: 100.0%  

-- top 10 canonicals (by alias count) --  
  Callaghan Innovation  (4 aliases, sources: crunchbase_hub)  
  EISMEA  (4 aliases, sources: crunchbase_hub)  
  PayTic  (2 aliases, sources: crunchbase_hub)  
  SixThirty  (2 aliases, sources: crunchbase_hub)  
  ...more
```

I’ve also added a diagnostic queue into the pipeline for low-confidence alias assignments — records whose WRatio against their cluster’s canonical falls *below* the threshold. This will show us merges that look suspicious and deserve a human eye:

``` python
# reconcile.py  
def review_queue(  
    records: Sequence[Dict[str, Any]],  
    clusters: Sequence[Cluster],  
    threshold: float,  
    limit: int = 8,  
) -> List[Tuple[float, str, str, str]]:  
    rid_to_cluster = {m["id"]: c for c in clusters for m in c.members}  
    lows = []  
    for r in records:  
        c     = rid_to_cluster.get(r["id"])  
        name  = r.get("company_name") or ""  
        score = _FUZZY_SCORER(name, c.canonical_name)  
        if score < threshold:  
            lows.append((score, name, c.canonical_name, c.canonical_id))  
    lows.sort(key=lambda x: x[0])  
    return lows[:limit]
```

In production this would feed a human-review UI or write to a `needs_review`

table. Here it just prints to stdout — but my point stands: **fuzzy matching isn't a black box. You can always surface the borderline decisions and let a human confirm them.**

That’s everything, thanks for reading!

**Q: Do you need ML or vector embeddings for company name matching?**

**A:** No, not for stylistic drift (legal suffixes, spacing, punctuation). Our pipeline uses RapidFuzz [fuzz.WRatio](https://rapidfuzz.github.io/RapidFuzz/Usage/fuzz.html#wratio) —which is a rule-based string similarity, not a trained model.

**Q: What similarity threshold should you use with WRatio?**

**A:** Start at **WRatio threshold 90**. At 90, unrelated pairs like `Stripe`

vs `Climate Corp`

score 45.0 and stay out, while suffix/spacing variants like `Necker FinTech`

vs `Necker FinTech Holdings Inc.`

score 90.0+ and merge. See the [score_cutoff](https://rapidfuzz.github.io/RapidFuzz/Usage/fuzz.html#wratio) parameter in the docs if you want early-exit optimization.

**Q: When does fuzzy matching fail for company names?**

**A:** When names share almost no tokens — e.g. brand `Investing.com`

vs legal entity `Fusion Media Limited`

(WRatio 30.0). Use a lookup table, domain, LEI, or enrichment API instead.

**Q: Why not join on** `company_name`

**in SQL?**

**A:** Because raw name joins will often miss legal variants. Resolve each row to a `canonical_id`

in Python, load clusters into Postgres, and only then can you safely do a `JOIN ... USING (canonical_id)`

.

I should clear some things up about this tutorial.

`ARYZE ApS`

, `Count Finance LTD`

, `PANTA Group`

. A real CRM would actually be dirtier: misspellings, stale names, entries from multiple import sources with inconsistent formatting. In practice the fuzzy pass may not hit 100%, but it'll still get you much closer than exact matching does.`Investing.com`

/ `Fusion Media Limited`

or `Lyrie.ai`

/ `OTT Cybersecurity Inc.`

share almost no tokens, so WRatio stays low and that's `slug → legal_name`

map. Fuzzy matching handles stylistic drift on the The normalize → exact-group → fuzzy-cluster → CRM join pattern I’ve described here applies directly to:

`Acme Corp`

, `Acme Corporation`

, and `ACME`

before they become three separate accounts in your sales pipeline.**That WRatio threshold is something you should play around with.** At WRatio threshold 90 (the default in this pipeline), clearly unrelated pairs stay out (`Stripe`

vs `Climate Corp`

scores 45.0) while suffix and spacing drift gets in. Drop to 80 and you'll catch more variants but start seeing false positives. This will differ based on your dataset, obviously, and the review queue is your safety net either way.

**Next step in production:** load `reconciled.json`

into Postgres, resolve each CRM row to a `canonical_id`

(same logic as `_crm_to_canonical`

in Python), then join on that key instead of `company_name`

.

```
-- Tables loaded from pipeline output (reconciled.json + raw_records + sample_crm)  
CREATE TABLE canonicals (  
  canonical_id   TEXT PRIMARY KEY,  
  canonical_name TEXT NOT NULL  
);  

CREATE TABLE entity_aliases (  
  canonical_id TEXT NOT NULL REFERENCES canonicals (canonical_id),  
  alias_name   TEXT NOT NULL,  
  source       TEXT,  
  match_score  NUMERIC,  
  PRIMARY KEY (canonical_id, alias_name)  
);  

CREATE TABLE hub_scrape (  
  id            TEXT PRIMARY KEY,  
  company_name  TEXT NOT NULL,  
  canonical_id  TEXT REFERENCES canonicals (canonical_id),  
  category      TEXT,  
  url           TEXT  
);  

CREATE TABLE crm_accounts (  
  id            TEXT PRIMARY KEY,  
  company_name  TEXT NOT NULL,  
  canonical_id  TEXT REFERENCES canonicals (canonical_id)  -- from Python CRM mapping  
);  

-- Broken: join on raw company_name  
SELECT COUNT(*) AS matched_rows  
FROM   hub_scrape h  
JOIN   crm_accounts c ON c.company_name = h.company_name;  
-- 56 / 96 (~58%) on this dataset  

-- Fixed: join on canonical_id (assigned during ETL from reconciled.json)  
SELECT h.company_name AS hub_name,  
       c.company_name AS crm_name,  
       h.canonical_id  
FROM   hub_scrape h  
JOIN   crm_accounts c USING (canonical_id)  
WHERE  h.company_name = 'Necker FinTech';  
-- hub_name: Necker FinTech  
-- crm_name: Necker FinTech Holdings Inc.  (or Necker Fin Tech, etc.)  
-- canonical_id: c_necker_fintech
```

`canonicals`

and `entity_aliases`

from `reconciled.json`

.`hub_scrape.canonical_id`

from the `aliases`

array (`id`

→ `canonical_id`

).`crm_accounts.canonical_id`

with the same `_crm_to_canonical`

logic you already run in Python (exact norm match, then WRatio ≥ 90).After that, SQL stays a plain equi-join — **fuzzy matching happens once upstream, and not inside the database.** I won’t cover that though; the *pattern* is the point, not the warehouse you choose to use.

None of this is new — entity resolution is a well-studied problem with industrial-strength tools (Dedupe, [Splink](https://moj-analytical-services.github.io/splink/demos/examples/duckdb/deterministic_dedupe.html), various record linkage toolkits) when you need them. **But for the common case of “I have two lists of company names and I need to join them,” you really don’t.** A normalization pass and a WRatio threshold gets you most of the way there in an afternoon, in pure Python, with zero infrastructure.
