Home Data EngineeringReplacing a Managed ETL Connector with a Custom CDC Sync

Replacing a Managed ETL Connector with a Custom CDC Sync

by Marc

We had been paying roughly 400 euros a month for a managed ETL connector to sync our CRM data into BigQuery. Six tables, a few thousand records each, updated a handful of times per day. The connector worked, but it had problems: data landed in a US-region dataset (we needed EU for GDPR compliance), the CDC implementation had quirks that caused phantom duplicates, and every time we needed to debug something we were staring at a black box. So I replaced it with a custom Cloud Run service. The whole thing runs for under 5 euros a month.

The Architecture

The service has two endpoints. POST /webhook receives real-time change events from the CRM’s webhook system: every time a deal, organisation, or person is created, updated, or deleted, the CRM pushes a payload to our endpoint. POST /sync runs a full reconciliation: it pulls every record from the CRM API and writes a complete snapshot to BigQuery, deactivating all previous records first. A Cloud Scheduler job triggers the full sync weekly as a safety net.

CDC Column Design

The key design decision was keeping the exact same CDC column naming that the managed connector used: _cdc_active, _cdc_start, _cdc_end, and _cdc_synced. This meant zero changes to our 200+ dbt models downstream. The transformation layer didn’t care that the data was now coming from a custom service instead of a managed connector — the column names and semantics were identical.

Each table uses BigQuery’s native partitioning on _cdc_start (by day) and clustering on id. This gives us efficient time-range scans for the dbt incremental models and fast point lookups when debugging individual records.

# BigQuery table definitions (simplified from the CREATE TABLE statements)
#
# Tables: deal_history, organization_history, person_history,
#         user_history, pipeline_history, stage_history
#
# All share this CDC column pattern:
#   id              INT64       -- CRM record ID
#   ...                         -- all CRM fields as JSON-extracted columns
#   _cdc_active BOOL       -- TRUE for current version
#   _cdc_start  TIMESTAMP  -- when this version became active
#   _cdc_end    TIMESTAMP  -- when this version was superseded (NULL if active)
#   _cdc_synced TIMESTAMP  -- when we wrote this row
#
# Partitioned by: _cdc_start (DAY)
# Clustered by: id

The Webhook Handler

When the CRM fires a webhook, it sends a JSON payload with the event type (added, updated, deleted) and the current state of the record. The handler transforms this into CDC rows: it deactivates the previous version by setting _cdc_end to now and _cdc_active to false, then inserts a new row with the current state.

from fastapi import FastAPI, Request, HTTPException
from google.cloud import bigquery
from datetime import datetime, timezone
import hmac, hashlib, json

app = FastAPI()
bq_client = bigquery.Client()
DATASET = "crm_sync"
WEBHOOK_SECRET = os.environ["WEBHOOK_SECRET"]

def verify_signature(body: bytes, signature: str) -> bool:
    """Verify the webhook signature using HMAC-SHA256."""
    expected = hmac.new(
        WEBHOOK_SECRET.encode(), body, hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature)

@app.post("/webhook")
async def handle_webhook(request: Request):
    body = await request.body()
    signature = request.headers.get("X-Webhook-Signature", "")

    if not verify_signature(body, signature):
        raise HTTPException(status_code=401, detail="Invalid signature")

    payload = json.loads(body)
    event_type = payload["meta"]["action"]    # added, updated, deleted
    object_type = payload["meta"]["object"]   # deal, organization, person, ...
    record = payload["current"]
    record_id = record["id"]
    now = datetime.now(timezone.utc)
    table = f"{DATASET}.{object_type}_history"

    # Step 1: Deactivate previous active version
    deactivate_sql = f"""
        UPDATE `{table}`
        SET _cdc_active = FALSE,
            _cdc_end = @now
        WHERE id = @record_id
          AND _cdc_active = TRUE
    """
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter("now", "TIMESTAMP", now),
            bigquery.ScalarQueryParameter("record_id", "INT64", record_id),
        ]
    )
    bq_client.query(deactivate_sql, job_config=job_config).result()

    # Step 2: Insert new version (unless deleted)
    if event_type != "deleted":
        row = flatten_crm_record(record)
        row["_cdc_active"] = True
        row["_cdc_start"] = now.isoformat()
        row["_cdc_end"] = None
        row["_cdc_synced"] = now.isoformat()

        errors = bq_client.insert_rows_json(table, [row])
        if errors:
            raise HTTPException(status_code=500, detail=str(errors))

    return {"status": "ok", "action": event_type, "id": record_id}

The flatten_crm_record function handles the CRM’s nested JSON structure — custom fields come as hash-keyed objects like {"abc123": "value"} that need to be mapped to human-readable column names. I maintain a field mapping that’s generated from the CRM’s fields API endpoint.

Full Sync: The Reconciliation Safety Net

Webhooks can be missed. Networks drop packets, services restart, endpoints return 5xx under load. The full sync endpoint exists to catch anything the webhooks missed. It runs weekly via Cloud Scheduler, and the logic is straightforward: deactivate everything, then write fresh.

@app.post("/sync")
async def full_sync(request: Request):
    verify_cron_secret(request)  # X-Sync-Secret header check
    now = datetime.now(timezone.utc)

    for object_type in ["deal", "organization", "person",
                        "user", "pipeline", "stage"]:
        table = f"{DATASET}.{object_type}_history"

        # Deactivate all current records
        bq_client.query(f"""
            UPDATE `{table}`
            SET _cdc_active = FALSE,
                _cdc_end = @now
            WHERE _cdc_active = TRUE
        """, job_config=make_params(now=now)).result()

        # Fetch all records from CRM API (paginated)
        records = fetch_all_from_crm(object_type)

        # Batch insert as new active versions
        rows = []
        for record in records:
            row = flatten_crm_record(record)
            row["_cdc_active"] = True
            row["_cdc_start"] = now.isoformat()
            row["_cdc_end"] = None
            row["_cdc_synced"] = now.isoformat()
            rows.append(row)

        # Insert in batches of 500
        for i in range(0, len(rows), 500):
            batch = rows[i:i+500]
            errors = bq_client.insert_rows_json(table, batch)
            if errors:
                log.error(f"Insert errors for {table}: {errors}")

    return {"status": "ok", "tables_synced": 6}

The MERGE Pattern for Downstream Models

Our dbt incremental models use a MERGE statement to process the CDC history tables. This pattern was already in place for the managed connector, and since we kept the same column names, it continued to work unchanged.

-- dbt incremental model pattern (simplified)
-- This MERGE picks up the latest active version of each record.

MERGE INTO {{ this }} AS target
USING (
    SELECT *
    FROM {{ source('crm_sync', 'deal_history') }}
    WHERE _cdc_active = TRUE
      AND _cdc_start > (
          SELECT COALESCE(MAX(_cdc_start), '1970-01-01')
          FROM {{ this }}
      )
) AS source
ON target.id = source.id
WHEN MATCHED THEN
    UPDATE SET
        target.title = source.title,
        target.value = source.value,
        target.status = source.status,
        target._cdc_start = source._cdc_start,
        target._cdc_synced = source._cdc_synced
WHEN NOT MATCHED THEN
    INSERT (id, title, value, status, _cdc_start, _cdc_synced)
    VALUES (source.id, source.title, source.value, source.status,
            source._cdc_start, source._cdc_synced)

Validation: Proving Parity

Before decommissioning the managed connector, I ran both systems in parallel for two weeks and wrote a comparison script to validate record counts and field-level differences across all six tables. The script queries both datasets and reports any discrepancies.

#!/bin/bash
# compare-sync.sh — Validate custom sync vs managed connector
# Compares active record counts and field mismatches for all 6 tables.

TABLES="deal organization person user pipeline stage"
OLD_DATASET="crm_managed"       # managed connector (US)
NEW_DATASET="crm_sync"          # custom sync (EU)
PROJECT="my-data-project"

for table in $TABLES; do
    echo "=== ${table}_history ==="

    # Count active records in each dataset
    old_count=$(bq query --project_id=$PROJECT --use_legacy_sql=false \
        "SELECT COUNT(*) as c FROM \`${OLD_DATASET}.${table}_history\` WHERE _cdc_active = TRUE" \
        --format=csv | tail -1)

    new_count=$(bq query --project_id=$PROJECT --use_legacy_sql=false \
        "SELECT COUNT(*) as c FROM \`${NEW_DATASET}.${table}_history\` WHERE _cdc_active = TRUE" \
        --format=csv | tail -1)

    echo "  Old: $old_count  New: $new_count"

    if [ "$old_count" != "$new_count" ]; then
        echo "  *** COUNT MISMATCH ***"

        # Find IDs present in one but not the other
        bq query --project_id=$PROJECT --use_legacy_sql=false "
            SELECT 'only_in_old' as source, id
            FROM \`${OLD_DATASET}.${table}_history\` WHERE _cdc_active = TRUE
            EXCEPT DISTINCT
            SELECT 'only_in_old', id
            FROM \`${NEW_DATASET}.${table}_history\` WHERE _cdc_active = TRUE
            UNION ALL
            SELECT 'only_in_new', id
            FROM \`${NEW_DATASET}.${table}_history\` WHERE _cdc_active = TRUE
            EXCEPT DISTINCT
            SELECT 'only_in_new', id
            FROM \`${OLD_DATASET}.${table}_history\` WHERE _cdc_active = TRUE
        "
    fi

    # Check field-level mismatches on common records
    bq query --project_id=$PROJECT --use_legacy_sql=false "
        SELECT o.id,
               'title' as field,
               o.title as old_val,
               n.title as new_val
        FROM \`${OLD_DATASET}.${table}_history\` o
        JOIN \`${NEW_DATASET}.${table}_history\` n ON o.id = n.id
        WHERE o._cdc_active = TRUE AND n._cdc_active = TRUE
          AND o.title != n.title
        LIMIT 10
    "
    echo ""
done

After two weeks of parallel running, the comparison showed zero mismatches across all six tables: deals (5,208 active records), organisations (4,758), persons (6,904), users (45), pipelines (2), and stages (9).

Cold Start and Scaling

One gotcha with Cloud Run: cold starts. Webhooks from the CRM have a short timeout, and if the container had been scaled to zero, the first request would occasionally time out. I added Cloud Scheduler jobs to toggle min-instances between 1 during business hours and 0 at nights and weekends. The wake/sleep jobs use the Cloud Run Admin API via a POST with X-HTTP-Method-Override: PATCH (Cloud Scheduler does not support PATCH directly). This keeps weekday webhook latency under 200ms while still saving money outside business hours.

Security

Both endpoints are authenticated. The webhook endpoint verifies the CRM’s HMAC-SHA256 signature using hmac.compare_digest (constant-time comparison to prevent timing attacks). The sync endpoint requires an X-Sync-Secret header that matches a value stored in the cloud provider’s Secret Manager. The service account running the Cloud Run service has write access scoped to exactly one BigQuery dataset — it cannot read production data or touch any other resources.

The Numbers

  • Managed connector: ~400 euros/month for 6 tables, data in US region, limited debugging
  • Custom Cloud Run service: <5 euros/month, data in EU region, full observability
  • Development time: about 3 days for the initial build, 1 day for the comparison and cutover
  • dbt model changes required: zero
  • Downstream breakage: none

The lesson: managed ETL connectors are worth paying for when you have dozens of sources and limited engineering time. But when you have one or two sources with straightforward APIs, a custom sync can be dramatically cheaper, more transparent, and give you control over data residency. The trick is keeping the interface contract (column names, CDC semantics) identical so downstream consumers don’t need to change.

You may also like

Leave a Comment