We had a managed connector (think Fivetran, Airbyte, or similar) syncing our helpdesk data — tickets, contacts, threads, comments — from a SaaS helpdesk platform into BigQuery. It worked fine until it did not: the connector would miss webhook events, lag behind on incremental syncs, and occasionally produce phantom duplicates that our dbt models would faithfully propagate into dashboards. The cost was also non-trivial for what amounted to six tables.
I replaced it with a custom Cloud Run service that receives webhooks for real-time CDC and runs scheduled full reconciliations. The entire migration, including dbt source changes, took about a week. Here is how it works.
Architecture Overview
The high-level architecture:
┌──────────────────────────┐
│ Cloud Scheduler │
│ (daily + weekly full) │
└──────────┬───────────────┘
│ POST /sync
▼
┌──────────────┐ webhooks ┌──────────────────────────────────┐
│ Helpdesk │──────────────►│ Cloud Run Service │
│ Platform │ POST /wh │ (Python / FastAPI) │
│ (SaaS) │ │ │
│ │◄──────────────│ OAuth2 token refresh │
│ │ API calls │ Incremental search API sync │
│ │ │ Webhook event processing │
└──────────────┘ │ Full reconciliation mode │
└──────────┬───────────────────────┘
│
│ BigQuery writes
▼
┌─────────────────────────────┐
│ BigQuery Dataset │
│ (CDC tables with │
│ _fivetran_* columns) │
│ │
│ tickets_history │
│ contacts_history │
│ threads_history │
│ comments_history │
│ agents_history │
│ departments_history │
└──────────┬──────────────────┘
│
▼
┌─────────────────────────────┐
│ dbt Models │
│ (zero changes needed -- │
│ same CDC column names) │
└─────────────────────────────┘
Two entry points into the service:
- POST /webhook — receives real-time events from the helpdesk platform (ticket created, updated, commented, etc.)
- POST /sync — triggered by Cloud Scheduler for incremental and full reconciliation syncs
Challenge 1: OAuth Token Refresh
The helpdesk platform uses OAuth2 with refresh tokens. The access token expires every hour. The refresh token itself is long-lived but must be stored securely and updated whenever it rotates (some OAuth implementations rotate refresh tokens on every use).
I store the refresh token in Google Secret Manager and update it after every token refresh:
"""oauth.py -- OAuth2 token management with Secret Manager storage."""
import time
import json
import urllib.request
import urllib.parse
from google.cloud import secretmanager
# Cache the access token in memory
_token_cache = {"access_token": None, "expires_at": 0}
SECRET_PROJECT = "my-gcp-project"
REFRESH_TOKEN_SECRET = "helpdesk-oauth-refresh-token"
CLIENT_ID_SECRET = "helpdesk-oauth-client-id"
CLIENT_SECRET_SECRET = "helpdesk-oauth-client-secret"
TOKEN_URL = "https://accounts.helpdesk-platform.com/oauth/v2/token"
def _get_secret(name: str) -> str:
"""Read a secret from Google Secret Manager."""
client = secretmanager.SecretManagerServiceClient()
path = f"projects/{SECRET_PROJECT}/secrets/{name}/versions/latest"
response = client.access_secret_version(request={"name": path})
return response.payload.data.decode("utf-8").strip()
def _update_secret(name: str, value: str):
"""Write a new version of a secret to Secret Manager."""
client = secretmanager.SecretManagerServiceClient()
parent = f"projects/{SECRET_PROJECT}/secrets/{name}"
client.add_secret_version(
request={
"parent": parent,
"payload": {"data": value.encode("utf-8")},
}
)
def get_access_token() -> str:
"""Get a valid access token, refreshing if necessary."""
now = time.time()
# Return cached token if still valid (with 60s buffer)
if _token_cache["access_token"] and _token_cache["expires_at"] > now + 60:
return _token_cache["access_token"]
# Refresh the token
client_id = _get_secret(CLIENT_ID_SECRET)
client_secret = _get_secret(CLIENT_SECRET_SECRET)
refresh_token = _get_secret(REFRESH_TOKEN_SECRET)
data = urllib.parse.urlencode({
"grant_type": "refresh_token",
"client_id": client_id,
"client_secret": client_secret,
"refresh_token": refresh_token,
}).encode()
req = urllib.request.Request(TOKEN_URL, data=data, method="POST")
req.add_header("Content-Type", "application/x-www-form-urlencoded")
with urllib.request.urlopen(req) as resp:
token_data = json.loads(resp.read())
_token_cache["access_token"] = token_data["access_token"]
_token_cache["expires_at"] = now + token_data.get("expires_in", 3600)
# If the refresh token was rotated, update Secret Manager
new_refresh = token_data.get("refresh_token")
if new_refresh and new_refresh != refresh_token:
_update_secret(REFRESH_TOKEN_SECRET, new_refresh)
return _token_cache["access_token"]
def api_request(method: str, endpoint: str, params: dict = None,
body: dict = None) -> dict:
"""Make an authenticated API request to the helpdesk platform."""
token = get_access_token()
url = f"https://desk.helpdesk-platform.com/api/v1{endpoint}"
if params:
url += "?" + urllib.parse.urlencode(params)
data = json.dumps(body).encode() if body else None
req = urllib.request.Request(url, data=data, method=method)
req.add_header("Authorization", f"Zoho-oauthtoken {token}")
req.add_header("Content-Type", "application/json")
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read())
Challenge 2: Incremental Sync via Search API
The helpdesk’s list endpoints do not support filtering by modification date. But the search endpoint does. For incremental syncs, I query the search API for records modified since the last sync, then fetch full details for each one.
A critical detail: the search API has pagination limits (typically 50 results per page) and is rate-limited. I built a generic paginating fetcher:
"""sync.py -- Incremental and full sync logic."""
import logging
from datetime import datetime, timedelta, timezone
from google.cloud import bigquery
from oauth import api_request
logger = logging.getLogger(__name__)
BQ_CLIENT = bigquery.Client(project="my-gcp-project")
DATASET = "helpdesk_sync"
PAGE_SIZE = 50
def fetch_all_pages(endpoint: str, params: dict, key: str) -> list:
"""Paginate through all results from an API endpoint.
The helpdesk API uses offset-based pagination with a 'from' parameter.
IMPORTANT: When you get exactly PAGE_SIZE results, there are more pages.
"""
all_records = []
offset = 0
while True:
page_params = {**params, "from": offset, "limit": PAGE_SIZE}
response = api_request("GET", endpoint, params=page_params)
records = response.get("data", [])
if not records:
break
all_records.extend(records)
logger.info(f"Fetched {len(all_records)} records from {endpoint}")
# If we got fewer than PAGE_SIZE, we have reached the last page
if len(records) < PAGE_SIZE:
break
offset += PAGE_SIZE
return all_records
def get_last_sync_time(table: str) -> datetime:
"""Get the most recent _fivetran_synced timestamp from BigQuery."""
query = f"""
SELECT MAX(_fivetran_synced) AS last_sync
FROM `{BQ_CLIENT.project}.{DATASET}.{table}`
"""
rows = list(BQ_CLIENT.query(query).result())
if rows and rows[0].last_sync:
return rows[0].last_sync.replace(tzinfo=timezone.utc)
# Default: sync from 30 days ago
return datetime.now(timezone.utc) - timedelta(days=30)
def sync_table_incremental(table_config: dict):
"""Incremental sync: fetch records modified since last sync."""
table_name = table_config["table"]
search_endpoint = table_config["search_endpoint"]
detail_endpoint = table_config["detail_endpoint"]
id_field = table_config.get("id_field", "id")
last_sync = get_last_sync_time(f"{table_name}_history")
# Add 1-second buffer to avoid missing records at the boundary
since = (last_sync - timedelta(seconds=1)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
logger.info(f"Incremental sync {table_name} since {since}")
# Search for modified records
search_params = {
"modifiedTimeRange": since + "," + datetime.now(timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%S.000Z"
),
"sortBy": "modifiedTime",
}
records = fetch_all_pages(search_endpoint, search_params, "data")
logger.info(f"Found {len(records)} modified {table_name} records")
if not records:
return 0
# Fetch full details for each record (search results are partial)
full_records = []
for record in records:
record_id = record[id_field]
try:
detail = api_request("GET", f"{detail_endpoint}/{record_id}")
full_records.append(detail)
except Exception as e:
logger.error(f"Failed to fetch {table_name} {record_id}: {e}")
# Write to BigQuery with CDC columns
write_to_bigquery(table_name, full_records)
return len(full_records)
Challenge 3: CDC Column Naming (The Zero-Change Migration)
This was the key architectural decision. Our dbt models expected CDC columns from the old managed connector:
_fivetran_active(BOOLEAN) — is this the current version of the record?_fivetran_start(TIMESTAMP) — when this version became active_fivetran_end(TIMESTAMP) — when this version was superseded (NULL if current)_fivetran_synced(TIMESTAMP) — when the sync captured this version
Rather than changing the dbt models (which would cascade through multiple layers and require a full-refresh), I kept the exact same column names in the new dataset. The CDC logic in the write function implements the same semantics:
"""cdc.py -- CDC (Change Data Capture) write logic with Fivetran-compatible columns."""
import json
import hashlib
from datetime import datetime, timezone
from google.cloud import bigquery
BQ_CLIENT = bigquery.Client(project="my-gcp-project")
DATASET = "helpdesk_sync"
def compute_row_hash(record: dict) -> str:
"""Deterministic hash of record contents for change detection."""
# Sort keys for deterministic serialisation
serialised = json.dumps(record, sort_keys=True, default=str)
return hashlib.md5(serialised.encode()).hexdigest()
def write_to_bigquery(table_name: str, records: list):
"""Write records to BigQuery with Fivetran-compatible CDC columns.
For each record:
1. Check if the record already exists and is active
2. If it exists and content changed: deactivate old row, insert new active row
3. If it exists and content unchanged: skip (idempotent)
4. If it does not exist: insert new active row
"""
if not records:
return
history_table = f"{BQ_CLIENT.project}.{DATASET}.{table_name}_history"
now = datetime.now(timezone.utc)
ids = [str(r["id"]) for r in records]
# Fetch current active rows for these IDs
id_list = ", ".join(f"'{id}'" for id in ids)
existing_query = f"""
SELECT id, _fivetran_start, _row_hash
FROM `{history_table}`
WHERE _fivetran_active = TRUE
AND CAST(id AS STRING) IN ({id_list})
"""
existing = {}
try:
for row in BQ_CLIENT.query(existing_query).result():
existing[str(row.id)] = {
"start": row._fivetran_start,
"hash": row._row_hash,
}
except Exception:
pass # Table may not exist yet on first sync
rows_to_deactivate = []
rows_to_insert = []
for record in records:
record_id = str(record["id"])
row_hash = compute_row_hash(record)
if record_id in existing:
if existing[record_id]["hash"] == row_hash:
# No change -- skip
continue
# Content changed -- deactivate old row
rows_to_deactivate.append({
"id": record_id,
"old_start": existing[record_id]["start"],
})
# Insert new active row with Fivetran-compatible CDC columns
row = flatten_record(record)
row["_fivetran_active"] = True
row["_fivetran_start"] = now.isoformat()
row["_fivetran_end"] = None # NULL = currently active
row["_fivetran_synced"] = now.isoformat()
row["_row_hash"] = row_hash
rows_to_insert.append(row)
# Deactivate old rows (set _fivetran_active=FALSE, _fivetran_end=now)
if rows_to_deactivate:
deactivate_ids = [r["id"] for r in rows_to_deactivate]
id_list = ", ".join(f"'{id}'" for id in deactivate_ids)
deactivate_query = f"""
UPDATE `{history_table}`
SET _fivetran_active = FALSE,
_fivetran_end = TIMESTAMP('{now.isoformat()}')
WHERE _fivetran_active = TRUE
AND CAST(id AS STRING) IN ({id_list})
"""
BQ_CLIENT.query(deactivate_query).result()
# Insert new rows
if rows_to_insert:
errors = BQ_CLIENT.insert_rows_json(history_table, rows_to_insert)
if errors:
raise RuntimeError(f"BigQuery insert errors: {errors}")
logger.info(
f"{table_name}: {len(rows_to_insert)} inserted, "
f"{len(rows_to_deactivate)} deactivated, "
f"{len(records) - len(rows_to_insert)} unchanged"
)
def flatten_record(record: dict, prefix: str = "") -> dict:
"""Flatten nested dicts into dot-separated column names.
Helpdesk API returns nested objects (e.g., contact.name).
BigQuery needs flat columns.
"""
flat = {}
for key, value in record.items():
col_name = f"{prefix}{key}" if not prefix else f"{prefix}_{key}"
if isinstance(value, dict):
flat.update(flatten_record(value, col_name))
elif isinstance(value, list):
flat[col_name] = json.dumps(value)
else:
flat[col_name] = value
return flat
Because the column names are identical, the dbt source definitions only needed the dataset name changed:
# dbt source definition -- the ONLY change needed
sources:
- name: helpdesk
# OLD: dataset from managed connector
# database: my-project-raw
# schema: helpdesk_fivetran
# NEW: dataset from custom sync
database: my-project-raw
schema: helpdesk_sync
tables:
- name: tickets_history
- name: contacts_history
- name: threads_history
- name: comments_history
- name: agents_history
- name: departments_history
Zero changes to any downstream dbt model. The staging models, intermediate models, and reporting models all work unchanged because they reference source('helpdesk', 'tickets_history') and the CDC column semantics are identical.
Challenge 4: Webhook Event Ordering
Webhooks arrive in near-real-time, but not necessarily in order. If a ticket is updated twice in quick succession, the webhook for the second update might arrive before the first. If we process them naively, we end up with the old state overwriting the new state.
The solution has two parts:
- Always fetch the current state from the API — never trust the webhook payload as the source of truth. The webhook tells us what changed; we then fetch the full current record from the API to get the actual state.
- Use the row hash for idempotency — if the current state matches what we already have in BigQuery, skip the write. This handles duplicate webhooks and out-of-order processing gracefully.
"""webhook.py -- Webhook event handler."""
from fastapi import FastAPI, Request, Response
import hmac
import hashlib
import logging
from sync import TABLE_CONFIGS
from cdc import write_to_bigquery
from oauth import api_request
app = FastAPI()
logger = logging.getLogger(__name__)
WEBHOOK_SECRET = None # Loaded from Secret Manager at startup
def verify_webhook_signature(payload: bytes, signature: str) -> bool:
"""Verify the webhook payload signature using HMAC-SHA256."""
if not WEBHOOK_SECRET:
return False
expected = hmac.new(
WEBHOOK_SECRET.encode(), payload, hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
@app.post("/webhook")
async def handle_webhook(request: Request):
"""Process incoming webhook events from the helpdesk platform."""
payload = await request.body()
signature = request.headers.get("X-Webhook-Signature", "")
if not verify_webhook_signature(payload, signature):
return Response(status_code=401, content="Invalid signature")
event = await request.json()
event_type = event.get("eventType", "")
entity_type = event.get("entity", {}).get("type", "")
entity_id = event.get("entity", {}).get("id")
logger.info(f"Webhook: {event_type} on {entity_type} {entity_id}")
# Map entity type to table config
table_config = None
for config in TABLE_CONFIGS:
if config["entity_type"] == entity_type:
table_config = config
break
if not table_config:
logger.warning(f"Unknown entity type: {entity_type}")
return {"status": "ignored"}
if event_type in ("delete", "trash"):
# For deletes: deactivate the record without inserting a new version
handle_delete(table_config["table"], entity_id)
return {"status": "deleted"}
# For creates and updates: fetch current state from API
# (never trust webhook payload -- it may be partial or stale)
try:
record = api_request(
"GET", f"{table_config['detail_endpoint']}/{entity_id}"
)
except Exception as e:
logger.error(f"Failed to fetch {entity_type} {entity_id}: {e}")
return Response(status_code=500, content=str(e))
# Write to BigQuery (idempotent -- skips if content unchanged)
write_to_bigquery(table_config["table"], [record])
return {"status": "processed"}
def handle_delete(table_name: str, record_id: str):
"""Deactivate a deleted record in BigQuery."""
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
history_table = f"{BQ_CLIENT.project}.{DATASET}.{table_name}_history"
query = f"""
UPDATE `{history_table}`
SET _fivetran_active = FALSE,
_fivetran_end = TIMESTAMP('{now.isoformat()}')
WHERE _fivetran_active = TRUE
AND CAST(id AS STRING) = '{record_id}'
"""
BQ_CLIENT.query(query).result()
logger.info(f"Deactivated deleted {table_name} record {record_id}")
Challenge 5: Full Reconciliation
Webhooks can be lost (platform outages, network issues, our service being down during a deploy). The scheduled full reconciliation catches anything that fell through the cracks.
A “full” sync does not mean re-ingesting everything from scratch. It means:
- Fetch all record IDs and modification timestamps from the API
- Compare with what we have in BigQuery
- Fetch and upsert only the records that are missing or have a newer modification timestamp
def sync_table_full(table_config: dict):
"""Full reconciliation: compare API state with BigQuery state."""
table_name = table_config["table"]
list_endpoint = table_config["list_endpoint"]
detail_endpoint = table_config["detail_endpoint"]
logger.info(f"Full reconciliation for {table_name}")
# Step 1: Get all records from the API (paginated)
api_records = fetch_all_pages(list_endpoint, {}, "data")
api_index = {str(r["id"]): r.get("modifiedTime", "") for r in api_records}
logger.info(f"API has {len(api_index)} {table_name} records")
# Step 2: Get all active records from BigQuery
history_table = f"{BQ_CLIENT.project}.{DATASET}.{table_name}_history"
bq_query = f"""
SELECT CAST(id AS STRING) AS id, _fivetran_synced
FROM `{history_table}`
WHERE _fivetran_active = TRUE
"""
bq_index = {}
try:
for row in BQ_CLIENT.query(bq_query).result():
bq_index[row.id] = row._fivetran_synced
except Exception:
pass # Table may not exist
# Step 3: Find records that need updating
to_fetch = []
# Records in API but not in BQ (new)
for record_id in api_index:
if record_id not in bq_index:
to_fetch.append(record_id)
# Records in API that are newer than BQ version
for record_id, modified_time in api_index.items():
if record_id in bq_index:
# Compare modification times (API time may be more recent)
if modified_time and modified_time > bq_index[record_id].isoformat():
to_fetch.append(record_id)
# Records in BQ but not in API (deleted in source)
deleted_ids = set(bq_index.keys()) - set(api_index.keys())
for record_id in deleted_ids:
handle_delete(table_name, record_id)
logger.info(
f"Reconciliation: {len(to_fetch)} to fetch, "
f"{len(deleted_ids)} deleted, "
f"{len(api_index) - len(to_fetch) - len(deleted_ids)} unchanged"
)
# Step 4: Fetch and write updated records
if to_fetch:
full_records = []
for record_id in to_fetch:
try:
record = api_request("GET", f"{detail_endpoint}/{record_id}")
full_records.append(record)
except Exception as e:
logger.error(f"Failed to fetch {table_name} {record_id}: {e}")
write_to_bigquery(table_name, full_records)
return len(to_fetch)
The FastAPI Application
The Cloud Run service ties everything together:
"""main.py -- Cloud Run entry point."""
import os
import hmac
import logging
from datetime import datetime, timezone
from fastapi import FastAPI, Request, Response
from oauth import get_access_token
from sync import sync_table_incremental, sync_table_full, TABLE_CONFIGS
from webhook import handle_webhook, verify_webhook_signature, WEBHOOK_SECRET
app = FastAPI()
logger = logging.getLogger(__name__)
# Auth for scheduled triggers
SYNC_SECRET = os.environ.get("SYNC_SECRET", "")
@app.get("/")
async def health():
"""Health check."""
return {"status": "ok", "tables": len(TABLE_CONFIGS)}
@app.post("/sync")
async def sync(request: Request):
"""Triggered by Cloud Scheduler. Runs incremental or full sync."""
# Authenticate via shared secret
provided_secret = request.headers.get("X-Sync-Secret", "")
if not hmac.compare_digest(provided_secret, SYNC_SECRET):
return Response(status_code=403, content="Forbidden")
mode = request.query_params.get("mode", "incremental")
results = {}
for config in TABLE_CONFIGS:
table_name = config["table"]
try:
if mode == "full":
count = sync_table_full(config)
else:
count = sync_table_incremental(config)
results[table_name] = {"status": "ok", "records": count}
except Exception as e:
logger.error(f"Sync failed for {table_name}: {e}")
results[table_name] = {"status": "error", "error": str(e)}
return {"mode": mode, "results": results}
@app.post("/webhook")
async def webhook_handler(request: Request):
"""Receives real-time events from the helpdesk platform."""
return await handle_webhook(request)
# Table configurations
TABLE_CONFIGS = [
{
"table": "tickets",
"entity_type": "ticket",
"list_endpoint": "/tickets",
"search_endpoint": "/tickets/search",
"detail_endpoint": "/tickets",
},
{
"table": "contacts",
"entity_type": "contact",
"list_endpoint": "/contacts",
"search_endpoint": "/contacts/search",
"detail_endpoint": "/contacts",
},
{
"table": "threads",
"entity_type": "thread",
"list_endpoint": "/tickets/{ticket_id}/threads",
"search_endpoint": None, # threads synced via parent ticket
"detail_endpoint": "/tickets/{ticket_id}/threads",
},
{
"table": "agents",
"entity_type": "agent",
"list_endpoint": "/agents",
"search_endpoint": None,
"detail_endpoint": "/agents",
},
{
"table": "departments",
"entity_type": "department",
"list_endpoint": "/departments",
"search_endpoint": None,
"detail_endpoint": "/departments",
},
]
Cloud Scheduler Setup
Two scheduler jobs handle the automated syncs:
#!/bin/bash
# setup-scheduler.sh
PROJECT="my-gcp-project"
REGION="europe-west3"
SERVICE_URL="https://helpdesk-sync-abc123.europe-west3.run.app"
SYNC_SECRET=$(gcloud secrets versions access latest \
--secret=helpdesk-sync-secret --project=$PROJECT)
# Daily incremental sync (catches missed webhooks)
gcloud scheduler jobs create http helpdesk-daily-sync \
--project=$PROJECT \
--location=$REGION \
--schedule="0 5 * * *" \
--time-zone="Europe/Amsterdam" \
--uri="$SERVICE_URL/sync" \
--http-method=POST \
--headers="X-Sync-Secret=$SYNC_SECRET" \
--oidc-service-account-email="scheduler-sa@$PROJECT.iam.gserviceaccount.com"
# Weekly full reconciliation (catches any drift)
gcloud scheduler jobs create http helpdesk-weekly-full-sync \
--project=$PROJECT \
--location=$REGION \
--schedule="0 4 * * 0" \
--time-zone="Europe/Amsterdam" \
--uri="$SERVICE_URL/sync?mode=full" \
--http-method=POST \
--headers="X-Sync-Secret=$SYNC_SECRET" \
--oidc-service-account-email="scheduler-sa@$PROJECT.iam.gserviceaccount.com"
Cold Start Handling
Cloud Run scales to zero by default. A webhook arriving when the service is cold leads to a 5-10 second cold start, which can cause the helpdesk platform to time out and retry. Retries are fine (our write logic is idempotent), but the timeout logs create noise.
I solved this with a pair of Cloud Scheduler jobs that toggle min-instances between 1 (during business hours) and 0 (nights and weekends):
# Wake up (business hours)
gcloud scheduler jobs create http helpdesk-sync-wake \
--schedule="0 7 * * 1-5" \
--time-zone="Europe/Amsterdam" \
--uri="https://run.googleapis.com/v2/projects/$PROJECT/locations/$REGION/services/helpdesk-sync" \
--http-method=POST \
--headers="X-HTTP-Method-Override=PATCH,Content-Type=application/json" \
--message-body='{"scaling":{"minInstanceCount":1}}' \
--oauth-service-account-email="cloud-run-admin@$PROJECT.iam.gserviceaccount.com"
# Sleep (evenings)
gcloud scheduler jobs create http helpdesk-sync-sleep \
--schedule="0 18 * * 1-5" \
--time-zone="Europe/Amsterdam" \
--uri="https://run.googleapis.com/v2/projects/$PROJECT/locations/$REGION/services/helpdesk-sync" \
--http-method=POST \
--headers="X-HTTP-Method-Override=PATCH,Content-Type=application/json" \
--message-body='{"scaling":{"minInstanceCount":0}}' \
--oauth-service-account-email="cloud-run-admin@$PROJECT.iam.gserviceaccount.com"
Note the X-HTTP-Method-Override: PATCH header — Cloud Scheduler does not natively support PATCH requests, so we use the override header with a POST.
The Migration Cutover
The actual migration followed this sequence:
- Deploy the service and run a full sync — populates the new dataset alongside the old one
- Enable webhooks on the helpdesk platform — new events flow into both the old connector and the new service
- Run comparison queries — verify record counts and content hashes match between old and new datasets for all 6 tables
- Update dbt source definition — point to the new dataset (the one-line YAML change shown earlier)
- Run dbt with full-refresh — rebuild all downstream models from the new source
- Disable the old managed connector — stop paying for it
Step 3 is the critical validation. I wrote a comparison script that checks active record counts, runs a full outer join on IDs, and compares content hashes for every matching record. Any mismatch would have been a blocker.
Results
- Latency: Webhook events appear in BigQuery within 2-3 seconds (vs 15-60 minutes with the managed connector’s sync interval)
- Reliability: Zero missed records in the first month (the weekly full reconciliation has found 0 discrepancies so far)
- Cost: Cloud Run costs about 8 euros/month (min-instances during business hours). The managed connector was significantly more.
- dbt changes: One line changed (dataset name in source YAML). Zero downstream model changes.
Lessons
- Match the CDC column naming exactly. If your dbt models expect
_fivetran_active, give them_fivetran_active. Renaming columns to be “more correct” is not worth the cascade of downstream changes. - Never trust webhook payloads as source of truth. Always re-fetch the full record from the API. Webhooks tell you what changed; the API tells you the current state.
- Row hashing gives you idempotency for free. Hash the record content, compare before writing. This handles duplicate webhooks, out-of-order delivery, and repeated syncs without any special logic.
- Full reconciliation is your safety net. Even with perfect webhook handling, run a scheduled full sync. It catches edge cases you have not thought of yet.
- Min-instance scheduling saves money and latency. Pay for a warm instance during working hours when webhooks are frequent. Scale to zero at night when support tickets are not being updated.
- Store OAuth refresh tokens in Secret Manager, not in environment variables. When a refresh token rotates, you need to persist the new one. Environment variables are immutable per Cloud Run revision. Secret Manager lets you update tokens without redeploying.
The pattern is generic enough that I have since used the same codebase structure to replace a second managed connector for CRM data. Same CDC column naming trick, same webhook + reconciliation architecture, same zero-dbt-change migration. If you are paying for a managed connector that syncs fewer than 10 tables, it is worth evaluating whether a custom Cloud Run service would be simpler and cheaper.
