We had been paying roughly 400 euros a month for a managed ETL connector to sync our CRM data into BigQuery. Six tables, a few thousand records each, updated a handful of times per day. The connector worked, but it had problems: data landed in a US-region dataset (we needed EU for GDPR compliance), the CDC implementation had quirks that caused phantom duplicates, and every time we needed to debug something we were staring at a black box. So I replaced it with a custom Cloud Run service. The whole thing runs for under 5 euros a month.
The Architecture
The service has two endpoints. POST /webhook receives real-time change events from the CRM’s webhook system: every time a deal, organisation, or person is created, updated, or deleted, the CRM pushes a payload to our endpoint. POST /sync runs a full reconciliation: it pulls every record from the CRM API and writes a complete snapshot to BigQuery, deactivating all previous records first. A Cloud Scheduler job triggers the full sync weekly as a safety net.
CDC Column Design
The key design decision was keeping the exact same CDC column naming that the managed connector used: _cdc_active, _cdc_start, _cdc_end, and _cdc_synced. This meant zero changes to our 200+ dbt models downstream. The transformation layer didn’t care that the data was now coming from a custom service instead of a managed connector — the column names and semantics were identical.
Each table uses BigQuery’s native partitioning on _cdc_start (by day) and clustering on id. This gives us efficient time-range scans for the dbt incremental models and fast point lookups when debugging individual records.
# BigQuery table definitions (simplified from the CREATE TABLE statements)
#
# Tables: deal_history, organization_history, person_history,
# user_history, pipeline_history, stage_history
#
# All share this CDC column pattern:
# id INT64 -- CRM record ID
# ... -- all CRM fields as JSON-extracted columns
# _cdc_active BOOL -- TRUE for current version
# _cdc_start TIMESTAMP -- when this version became active
# _cdc_end TIMESTAMP -- when this version was superseded (NULL if active)
# _cdc_synced TIMESTAMP -- when we wrote this row
#
# Partitioned by: _cdc_start (DAY)
# Clustered by: id
The Webhook Handler
When the CRM fires a webhook, it sends a JSON payload with the event type (added, updated, deleted) and the current state of the record. The handler transforms this into CDC rows: it deactivates the previous version by setting _cdc_end to now and _cdc_active to false, then inserts a new row with the current state.
from fastapi import FastAPI, Request, HTTPException
from google.cloud import bigquery
from datetime import datetime, timezone
import hmac, hashlib, json
app = FastAPI()
bq_client = bigquery.Client()
DATASET = "crm_sync"
WEBHOOK_SECRET = os.environ["WEBHOOK_SECRET"]
def verify_signature(body: bytes, signature: str) -> bool:
"""Verify the webhook signature using HMAC-SHA256."""
expected = hmac.new(
WEBHOOK_SECRET.encode(), body, hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
@app.post("/webhook")
async def handle_webhook(request: Request):
body = await request.body()
signature = request.headers.get("X-Webhook-Signature", "")
if not verify_signature(body, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
payload = json.loads(body)
event_type = payload["meta"]["action"] # added, updated, deleted
object_type = payload["meta"]["object"] # deal, organization, person, ...
record = payload["current"]
record_id = record["id"]
now = datetime.now(timezone.utc)
table = f"{DATASET}.{object_type}_history"
# Step 1: Deactivate previous active version
deactivate_sql = f"""
UPDATE `{table}`
SET _cdc_active = FALSE,
_cdc_end = @now
WHERE id = @record_id
AND _cdc_active = TRUE
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("now", "TIMESTAMP", now),
bigquery.ScalarQueryParameter("record_id", "INT64", record_id),
]
)
bq_client.query(deactivate_sql, job_config=job_config).result()
# Step 2: Insert new version (unless deleted)
if event_type != "deleted":
row = flatten_crm_record(record)
row["_cdc_active"] = True
row["_cdc_start"] = now.isoformat()
row["_cdc_end"] = None
row["_cdc_synced"] = now.isoformat()
errors = bq_client.insert_rows_json(table, [row])
if errors:
raise HTTPException(status_code=500, detail=str(errors))
return {"status": "ok", "action": event_type, "id": record_id}
The flatten_crm_record function handles the CRM’s nested JSON structure — custom fields come as hash-keyed objects like {"abc123": "value"} that need to be mapped to human-readable column names. I maintain a field mapping that’s generated from the CRM’s fields API endpoint.
Full Sync: The Reconciliation Safety Net
Webhooks can be missed. Networks drop packets, services restart, endpoints return 5xx under load. The full sync endpoint exists to catch anything the webhooks missed. It runs weekly via Cloud Scheduler, and the logic is straightforward: deactivate everything, then write fresh.
@app.post("/sync")
async def full_sync(request: Request):
verify_cron_secret(request) # X-Sync-Secret header check
now = datetime.now(timezone.utc)
for object_type in ["deal", "organization", "person",
"user", "pipeline", "stage"]:
table = f"{DATASET}.{object_type}_history"
# Deactivate all current records
bq_client.query(f"""
UPDATE `{table}`
SET _cdc_active = FALSE,
_cdc_end = @now
WHERE _cdc_active = TRUE
""", job_config=make_params(now=now)).result()
# Fetch all records from CRM API (paginated)
records = fetch_all_from_crm(object_type)
# Batch insert as new active versions
rows = []
for record in records:
row = flatten_crm_record(record)
row["_cdc_active"] = True
row["_cdc_start"] = now.isoformat()
row["_cdc_end"] = None
row["_cdc_synced"] = now.isoformat()
rows.append(row)
# Insert in batches of 500
for i in range(0, len(rows), 500):
batch = rows[i:i+500]
errors = bq_client.insert_rows_json(table, batch)
if errors:
log.error(f"Insert errors for {table}: {errors}")
return {"status": "ok", "tables_synced": 6}
The MERGE Pattern for Downstream Models
Our dbt incremental models use a MERGE statement to process the CDC history tables. This pattern was already in place for the managed connector, and since we kept the same column names, it continued to work unchanged.
-- dbt incremental model pattern (simplified)
-- This MERGE picks up the latest active version of each record.
MERGE INTO {{ this }} AS target
USING (
SELECT *
FROM {{ source('crm_sync', 'deal_history') }}
WHERE _cdc_active = TRUE
AND _cdc_start > (
SELECT COALESCE(MAX(_cdc_start), '1970-01-01')
FROM {{ this }}
)
) AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET
target.title = source.title,
target.value = source.value,
target.status = source.status,
target._cdc_start = source._cdc_start,
target._cdc_synced = source._cdc_synced
WHEN NOT MATCHED THEN
INSERT (id, title, value, status, _cdc_start, _cdc_synced)
VALUES (source.id, source.title, source.value, source.status,
source._cdc_start, source._cdc_synced)
Validation: Proving Parity
Before decommissioning the managed connector, I ran both systems in parallel for two weeks and wrote a comparison script to validate record counts and field-level differences across all six tables. The script queries both datasets and reports any discrepancies.
#!/bin/bash
# compare-sync.sh — Validate custom sync vs managed connector
# Compares active record counts and field mismatches for all 6 tables.
TABLES="deal organization person user pipeline stage"
OLD_DATASET="crm_managed" # managed connector (US)
NEW_DATASET="crm_sync" # custom sync (EU)
PROJECT="my-data-project"
for table in $TABLES; do
echo "=== ${table}_history ==="
# Count active records in each dataset
old_count=$(bq query --project_id=$PROJECT --use_legacy_sql=false \
"SELECT COUNT(*) as c FROM \`${OLD_DATASET}.${table}_history\` WHERE _cdc_active = TRUE" \
--format=csv | tail -1)
new_count=$(bq query --project_id=$PROJECT --use_legacy_sql=false \
"SELECT COUNT(*) as c FROM \`${NEW_DATASET}.${table}_history\` WHERE _cdc_active = TRUE" \
--format=csv | tail -1)
echo " Old: $old_count New: $new_count"
if [ "$old_count" != "$new_count" ]; then
echo " *** COUNT MISMATCH ***"
# Find IDs present in one but not the other
bq query --project_id=$PROJECT --use_legacy_sql=false "
SELECT 'only_in_old' as source, id
FROM \`${OLD_DATASET}.${table}_history\` WHERE _cdc_active = TRUE
EXCEPT DISTINCT
SELECT 'only_in_old', id
FROM \`${NEW_DATASET}.${table}_history\` WHERE _cdc_active = TRUE
UNION ALL
SELECT 'only_in_new', id
FROM \`${NEW_DATASET}.${table}_history\` WHERE _cdc_active = TRUE
EXCEPT DISTINCT
SELECT 'only_in_new', id
FROM \`${OLD_DATASET}.${table}_history\` WHERE _cdc_active = TRUE
"
fi
# Check field-level mismatches on common records
bq query --project_id=$PROJECT --use_legacy_sql=false "
SELECT o.id,
'title' as field,
o.title as old_val,
n.title as new_val
FROM \`${OLD_DATASET}.${table}_history\` o
JOIN \`${NEW_DATASET}.${table}_history\` n ON o.id = n.id
WHERE o._cdc_active = TRUE AND n._cdc_active = TRUE
AND o.title != n.title
LIMIT 10
"
echo ""
done
After two weeks of parallel running, the comparison showed zero mismatches across all six tables: deals (5,208 active records), organisations (4,758), persons (6,904), users (45), pipelines (2), and stages (9).
Cold Start and Scaling
One gotcha with Cloud Run: cold starts. Webhooks from the CRM have a short timeout, and if the container had been scaled to zero, the first request would occasionally time out. I added Cloud Scheduler jobs to toggle min-instances between 1 during business hours and 0 at nights and weekends. The wake/sleep jobs use the Cloud Run Admin API via a POST with X-HTTP-Method-Override: PATCH (Cloud Scheduler does not support PATCH directly). This keeps weekday webhook latency under 200ms while still saving money outside business hours.
Security
Both endpoints are authenticated. The webhook endpoint verifies the CRM’s HMAC-SHA256 signature using hmac.compare_digest (constant-time comparison to prevent timing attacks). The sync endpoint requires an X-Sync-Secret header that matches a value stored in the cloud provider’s Secret Manager. The service account running the Cloud Run service has write access scoped to exactly one BigQuery dataset — it cannot read production data or touch any other resources.
The Numbers
- Managed connector: ~400 euros/month for 6 tables, data in US region, limited debugging
- Custom Cloud Run service: <5 euros/month, data in EU region, full observability
- Development time: about 3 days for the initial build, 1 day for the comparison and cutover
- dbt model changes required: zero
- Downstream breakage: none
The lesson: managed ETL connectors are worth paying for when you have dozens of sources and limited engineering time. But when you have one or two sources with straightforward APIs, a custom sync can be dramatically cheaper, more transparent, and give you control over data residency. The trick is keeping the interface contract (column names, CDC semantics) identical so downstream consumers don’t need to change.
