Home Data EngineeringMigrating a SaaS Helpdesk Connector to a Custom Cloud Run Sync

Migrating a SaaS Helpdesk Connector to a Custom Cloud Run Sync

by Marc

We had a managed connector (think Fivetran, Airbyte, or similar) syncing our helpdesk data — tickets, contacts, threads, comments — from a SaaS helpdesk platform into BigQuery. It worked fine until it did not: the connector would miss webhook events, lag behind on incremental syncs, and occasionally produce phantom duplicates that our dbt models would faithfully propagate into dashboards. The cost was also non-trivial for what amounted to six tables.

I replaced it with a custom Cloud Run service that receives webhooks for real-time CDC and runs scheduled full reconciliations. The entire migration, including dbt source changes, took about a week. Here is how it works.

Architecture Overview

The high-level architecture:

                                        ┌──────────────────────────┐
                                        │   Cloud Scheduler        │
                                        │   (daily + weekly full)  │
                                        └──────────┬───────────────┘
                                                   │ POST /sync
                                                   ▼
┌──────────────┐   webhooks    ┌──────────────────────────────────┐
│  Helpdesk    │──────────────►│  Cloud Run Service               │
│  Platform    │   POST /wh    │  (Python / FastAPI)              │
│  (SaaS)      │               │                                  │
│              │◄──────────────│  OAuth2 token refresh             │
│              │   API calls   │  Incremental search API sync      │
│              │               │  Webhook event processing         │
└──────────────┘               │  Full reconciliation mode         │
                               └──────────┬───────────────────────┘
                                          │
                                          │ BigQuery writes
                                          ▼
                               ┌─────────────────────────────┐
                               │  BigQuery Dataset            │
                               │  (CDC tables with            │
                               │   _fivetran_* columns)       │
                               │                              │
                               │  tickets_history             │
                               │  contacts_history            │
                               │  threads_history             │
                               │  comments_history            │
                               │  agents_history              │
                               │  departments_history         │
                               └──────────┬──────────────────┘
                                          │
                                          ▼
                               ┌─────────────────────────────┐
                               │  dbt Models                  │
                               │  (zero changes needed --     │
                               │   same CDC column names)     │
                               └─────────────────────────────┘

Two entry points into the service:

  • POST /webhook — receives real-time events from the helpdesk platform (ticket created, updated, commented, etc.)
  • POST /sync — triggered by Cloud Scheduler for incremental and full reconciliation syncs

Challenge 1: OAuth Token Refresh

The helpdesk platform uses OAuth2 with refresh tokens. The access token expires every hour. The refresh token itself is long-lived but must be stored securely and updated whenever it rotates (some OAuth implementations rotate refresh tokens on every use).

I store the refresh token in Google Secret Manager and update it after every token refresh:

"""oauth.py -- OAuth2 token management with Secret Manager storage."""

import time
import json
import urllib.request
import urllib.parse
from google.cloud import secretmanager

# Cache the access token in memory
_token_cache = {"access_token": None, "expires_at": 0}

SECRET_PROJECT = "my-gcp-project"
REFRESH_TOKEN_SECRET = "helpdesk-oauth-refresh-token"
CLIENT_ID_SECRET = "helpdesk-oauth-client-id"
CLIENT_SECRET_SECRET = "helpdesk-oauth-client-secret"
TOKEN_URL = "https://accounts.helpdesk-platform.com/oauth/v2/token"


def _get_secret(name: str) -> str:
    """Read a secret from Google Secret Manager."""
    client = secretmanager.SecretManagerServiceClient()
    path = f"projects/{SECRET_PROJECT}/secrets/{name}/versions/latest"
    response = client.access_secret_version(request={"name": path})
    return response.payload.data.decode("utf-8").strip()


def _update_secret(name: str, value: str):
    """Write a new version of a secret to Secret Manager."""
    client = secretmanager.SecretManagerServiceClient()
    parent = f"projects/{SECRET_PROJECT}/secrets/{name}"
    client.add_secret_version(
        request={
            "parent": parent,
            "payload": {"data": value.encode("utf-8")},
        }
    )


def get_access_token() -> str:
    """Get a valid access token, refreshing if necessary."""
    now = time.time()

    # Return cached token if still valid (with 60s buffer)
    if _token_cache["access_token"] and _token_cache["expires_at"] > now + 60:
        return _token_cache["access_token"]

    # Refresh the token
    client_id = _get_secret(CLIENT_ID_SECRET)
    client_secret = _get_secret(CLIENT_SECRET_SECRET)
    refresh_token = _get_secret(REFRESH_TOKEN_SECRET)

    data = urllib.parse.urlencode({
        "grant_type": "refresh_token",
        "client_id": client_id,
        "client_secret": client_secret,
        "refresh_token": refresh_token,
    }).encode()

    req = urllib.request.Request(TOKEN_URL, data=data, method="POST")
    req.add_header("Content-Type", "application/x-www-form-urlencoded")

    with urllib.request.urlopen(req) as resp:
        token_data = json.loads(resp.read())

    _token_cache["access_token"] = token_data["access_token"]
    _token_cache["expires_at"] = now + token_data.get("expires_in", 3600)

    # If the refresh token was rotated, update Secret Manager
    new_refresh = token_data.get("refresh_token")
    if new_refresh and new_refresh != refresh_token:
        _update_secret(REFRESH_TOKEN_SECRET, new_refresh)

    return _token_cache["access_token"]


def api_request(method: str, endpoint: str, params: dict = None,
                body: dict = None) -> dict:
    """Make an authenticated API request to the helpdesk platform."""
    token = get_access_token()
    url = f"https://desk.helpdesk-platform.com/api/v1{endpoint}"

    if params:
        url += "?" + urllib.parse.urlencode(params)

    data = json.dumps(body).encode() if body else None
    req = urllib.request.Request(url, data=data, method=method)
    req.add_header("Authorization", f"Zoho-oauthtoken {token}")
    req.add_header("Content-Type", "application/json")

    with urllib.request.urlopen(req) as resp:
        return json.loads(resp.read())

Challenge 2: Incremental Sync via Search API

The helpdesk’s list endpoints do not support filtering by modification date. But the search endpoint does. For incremental syncs, I query the search API for records modified since the last sync, then fetch full details for each one.

A critical detail: the search API has pagination limits (typically 50 results per page) and is rate-limited. I built a generic paginating fetcher:

"""sync.py -- Incremental and full sync logic."""

import logging
from datetime import datetime, timedelta, timezone
from google.cloud import bigquery

from oauth import api_request

logger = logging.getLogger(__name__)

BQ_CLIENT = bigquery.Client(project="my-gcp-project")
DATASET = "helpdesk_sync"
PAGE_SIZE = 50


def fetch_all_pages(endpoint: str, params: dict, key: str) -> list:
    """Paginate through all results from an API endpoint.

    The helpdesk API uses offset-based pagination with a 'from' parameter.
    IMPORTANT: When you get exactly PAGE_SIZE results, there are more pages.
    """
    all_records = []
    offset = 0

    while True:
        page_params = {**params, "from": offset, "limit": PAGE_SIZE}
        response = api_request("GET", endpoint, params=page_params)

        records = response.get("data", [])
        if not records:
            break

        all_records.extend(records)
        logger.info(f"Fetched {len(all_records)} records from {endpoint}")

        # If we got fewer than PAGE_SIZE, we have reached the last page
        if len(records) < PAGE_SIZE:
            break

        offset += PAGE_SIZE

    return all_records


def get_last_sync_time(table: str) -> datetime:
    """Get the most recent _fivetran_synced timestamp from BigQuery."""
    query = f"""
        SELECT MAX(_fivetran_synced) AS last_sync
        FROM `{BQ_CLIENT.project}.{DATASET}.{table}`
    """
    rows = list(BQ_CLIENT.query(query).result())
    if rows and rows[0].last_sync:
        return rows[0].last_sync.replace(tzinfo=timezone.utc)
    # Default: sync from 30 days ago
    return datetime.now(timezone.utc) - timedelta(days=30)


def sync_table_incremental(table_config: dict):
    """Incremental sync: fetch records modified since last sync."""
    table_name = table_config["table"]
    search_endpoint = table_config["search_endpoint"]
    detail_endpoint = table_config["detail_endpoint"]
    id_field = table_config.get("id_field", "id")

    last_sync = get_last_sync_time(f"{table_name}_history")
    # Add 1-second buffer to avoid missing records at the boundary
    since = (last_sync - timedelta(seconds=1)).strftime("%Y-%m-%dT%H:%M:%S.000Z")

    logger.info(f"Incremental sync {table_name} since {since}")

    # Search for modified records
    search_params = {
        "modifiedTimeRange": since + "," + datetime.now(timezone.utc).strftime(
            "%Y-%m-%dT%H:%M:%S.000Z"
        ),
        "sortBy": "modifiedTime",
    }

    records = fetch_all_pages(search_endpoint, search_params, "data")
    logger.info(f"Found {len(records)} modified {table_name} records")

    if not records:
        return 0

    # Fetch full details for each record (search results are partial)
    full_records = []
    for record in records:
        record_id = record[id_field]
        try:
            detail = api_request("GET", f"{detail_endpoint}/{record_id}")
            full_records.append(detail)
        except Exception as e:
            logger.error(f"Failed to fetch {table_name} {record_id}: {e}")

    # Write to BigQuery with CDC columns
    write_to_bigquery(table_name, full_records)
    return len(full_records)

Challenge 3: CDC Column Naming (The Zero-Change Migration)

This was the key architectural decision. Our dbt models expected CDC columns from the old managed connector:

  • _fivetran_active (BOOLEAN) — is this the current version of the record?
  • _fivetran_start (TIMESTAMP) — when this version became active
  • _fivetran_end (TIMESTAMP) — when this version was superseded (NULL if current)
  • _fivetran_synced (TIMESTAMP) — when the sync captured this version

Rather than changing the dbt models (which would cascade through multiple layers and require a full-refresh), I kept the exact same column names in the new dataset. The CDC logic in the write function implements the same semantics:

"""cdc.py -- CDC (Change Data Capture) write logic with Fivetran-compatible columns."""

import json
import hashlib
from datetime import datetime, timezone
from google.cloud import bigquery

BQ_CLIENT = bigquery.Client(project="my-gcp-project")
DATASET = "helpdesk_sync"


def compute_row_hash(record: dict) -> str:
    """Deterministic hash of record contents for change detection."""
    # Sort keys for deterministic serialisation
    serialised = json.dumps(record, sort_keys=True, default=str)
    return hashlib.md5(serialised.encode()).hexdigest()


def write_to_bigquery(table_name: str, records: list):
    """Write records to BigQuery with Fivetran-compatible CDC columns.

    For each record:
    1. Check if the record already exists and is active
    2. If it exists and content changed: deactivate old row, insert new active row
    3. If it exists and content unchanged: skip (idempotent)
    4. If it does not exist: insert new active row
    """
    if not records:
        return

    history_table = f"{BQ_CLIENT.project}.{DATASET}.{table_name}_history"
    now = datetime.now(timezone.utc)
    ids = [str(r["id"]) for r in records]

    # Fetch current active rows for these IDs
    id_list = ", ".join(f"'{id}'" for id in ids)
    existing_query = f"""
        SELECT id, _fivetran_start, _row_hash
        FROM `{history_table}`
        WHERE _fivetran_active = TRUE
        AND CAST(id AS STRING) IN ({id_list})
    """
    existing = {}
    try:
        for row in BQ_CLIENT.query(existing_query).result():
            existing[str(row.id)] = {
                "start": row._fivetran_start,
                "hash": row._row_hash,
            }
    except Exception:
        pass  # Table may not exist yet on first sync

    rows_to_deactivate = []
    rows_to_insert = []

    for record in records:
        record_id = str(record["id"])
        row_hash = compute_row_hash(record)

        if record_id in existing:
            if existing[record_id]["hash"] == row_hash:
                # No change -- skip
                continue

            # Content changed -- deactivate old row
            rows_to_deactivate.append({
                "id": record_id,
                "old_start": existing[record_id]["start"],
            })

        # Insert new active row with Fivetran-compatible CDC columns
        row = flatten_record(record)
        row["_fivetran_active"] = True
        row["_fivetran_start"] = now.isoformat()
        row["_fivetran_end"] = None  # NULL = currently active
        row["_fivetran_synced"] = now.isoformat()
        row["_row_hash"] = row_hash

        rows_to_insert.append(row)

    # Deactivate old rows (set _fivetran_active=FALSE, _fivetran_end=now)
    if rows_to_deactivate:
        deactivate_ids = [r["id"] for r in rows_to_deactivate]
        id_list = ", ".join(f"'{id}'" for id in deactivate_ids)
        deactivate_query = f"""
            UPDATE `{history_table}`
            SET _fivetran_active = FALSE,
                _fivetran_end = TIMESTAMP('{now.isoformat()}')
            WHERE _fivetran_active = TRUE
            AND CAST(id AS STRING) IN ({id_list})
        """
        BQ_CLIENT.query(deactivate_query).result()

    # Insert new rows
    if rows_to_insert:
        errors = BQ_CLIENT.insert_rows_json(history_table, rows_to_insert)
        if errors:
            raise RuntimeError(f"BigQuery insert errors: {errors}")

    logger.info(
        f"{table_name}: {len(rows_to_insert)} inserted, "
        f"{len(rows_to_deactivate)} deactivated, "
        f"{len(records) - len(rows_to_insert)} unchanged"
    )


def flatten_record(record: dict, prefix: str = "") -> dict:
    """Flatten nested dicts into dot-separated column names.

    Helpdesk API returns nested objects (e.g., contact.name).
    BigQuery needs flat columns.
    """
    flat = {}
    for key, value in record.items():
        col_name = f"{prefix}{key}" if not prefix else f"{prefix}_{key}"
        if isinstance(value, dict):
            flat.update(flatten_record(value, col_name))
        elif isinstance(value, list):
            flat[col_name] = json.dumps(value)
        else:
            flat[col_name] = value
    return flat

Because the column names are identical, the dbt source definitions only needed the dataset name changed:

# dbt source definition -- the ONLY change needed
sources:
  - name: helpdesk
    # OLD: dataset from managed connector
    # database: my-project-raw
    # schema: helpdesk_fivetran

    # NEW: dataset from custom sync
    database: my-project-raw
    schema: helpdesk_sync

    tables:
      - name: tickets_history
      - name: contacts_history
      - name: threads_history
      - name: comments_history
      - name: agents_history
      - name: departments_history

Zero changes to any downstream dbt model. The staging models, intermediate models, and reporting models all work unchanged because they reference source('helpdesk', 'tickets_history') and the CDC column semantics are identical.

Challenge 4: Webhook Event Ordering

Webhooks arrive in near-real-time, but not necessarily in order. If a ticket is updated twice in quick succession, the webhook for the second update might arrive before the first. If we process them naively, we end up with the old state overwriting the new state.

The solution has two parts:

  1. Always fetch the current state from the API — never trust the webhook payload as the source of truth. The webhook tells us what changed; we then fetch the full current record from the API to get the actual state.
  2. Use the row hash for idempotency — if the current state matches what we already have in BigQuery, skip the write. This handles duplicate webhooks and out-of-order processing gracefully.
"""webhook.py -- Webhook event handler."""

from fastapi import FastAPI, Request, Response
import hmac
import hashlib
import logging

from sync import TABLE_CONFIGS
from cdc import write_to_bigquery
from oauth import api_request

app = FastAPI()
logger = logging.getLogger(__name__)

WEBHOOK_SECRET = None  # Loaded from Secret Manager at startup


def verify_webhook_signature(payload: bytes, signature: str) -> bool:
    """Verify the webhook payload signature using HMAC-SHA256."""
    if not WEBHOOK_SECRET:
        return False
    expected = hmac.new(
        WEBHOOK_SECRET.encode(), payload, hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature)


@app.post("/webhook")
async def handle_webhook(request: Request):
    """Process incoming webhook events from the helpdesk platform."""
    payload = await request.body()
    signature = request.headers.get("X-Webhook-Signature", "")

    if not verify_webhook_signature(payload, signature):
        return Response(status_code=401, content="Invalid signature")

    event = await request.json()
    event_type = event.get("eventType", "")
    entity_type = event.get("entity", {}).get("type", "")
    entity_id = event.get("entity", {}).get("id")

    logger.info(f"Webhook: {event_type} on {entity_type} {entity_id}")

    # Map entity type to table config
    table_config = None
    for config in TABLE_CONFIGS:
        if config["entity_type"] == entity_type:
            table_config = config
            break

    if not table_config:
        logger.warning(f"Unknown entity type: {entity_type}")
        return {"status": "ignored"}

    if event_type in ("delete", "trash"):
        # For deletes: deactivate the record without inserting a new version
        handle_delete(table_config["table"], entity_id)
        return {"status": "deleted"}

    # For creates and updates: fetch current state from API
    # (never trust webhook payload -- it may be partial or stale)
    try:
        record = api_request(
            "GET", f"{table_config['detail_endpoint']}/{entity_id}"
        )
    except Exception as e:
        logger.error(f"Failed to fetch {entity_type} {entity_id}: {e}")
        return Response(status_code=500, content=str(e))

    # Write to BigQuery (idempotent -- skips if content unchanged)
    write_to_bigquery(table_config["table"], [record])

    return {"status": "processed"}


def handle_delete(table_name: str, record_id: str):
    """Deactivate a deleted record in BigQuery."""
    from datetime import datetime, timezone
    now = datetime.now(timezone.utc)

    history_table = f"{BQ_CLIENT.project}.{DATASET}.{table_name}_history"
    query = f"""
        UPDATE `{history_table}`
        SET _fivetran_active = FALSE,
            _fivetran_end = TIMESTAMP('{now.isoformat()}')
        WHERE _fivetran_active = TRUE
        AND CAST(id AS STRING) = '{record_id}'
    """
    BQ_CLIENT.query(query).result()
    logger.info(f"Deactivated deleted {table_name} record {record_id}")

Challenge 5: Full Reconciliation

Webhooks can be lost (platform outages, network issues, our service being down during a deploy). The scheduled full reconciliation catches anything that fell through the cracks.

A “full” sync does not mean re-ingesting everything from scratch. It means:

  1. Fetch all record IDs and modification timestamps from the API
  2. Compare with what we have in BigQuery
  3. Fetch and upsert only the records that are missing or have a newer modification timestamp
def sync_table_full(table_config: dict):
    """Full reconciliation: compare API state with BigQuery state."""
    table_name = table_config["table"]
    list_endpoint = table_config["list_endpoint"]
    detail_endpoint = table_config["detail_endpoint"]

    logger.info(f"Full reconciliation for {table_name}")

    # Step 1: Get all records from the API (paginated)
    api_records = fetch_all_pages(list_endpoint, {}, "data")
    api_index = {str(r["id"]): r.get("modifiedTime", "") for r in api_records}
    logger.info(f"API has {len(api_index)} {table_name} records")

    # Step 2: Get all active records from BigQuery
    history_table = f"{BQ_CLIENT.project}.{DATASET}.{table_name}_history"
    bq_query = f"""
        SELECT CAST(id AS STRING) AS id, _fivetran_synced
        FROM `{history_table}`
        WHERE _fivetran_active = TRUE
    """
    bq_index = {}
    try:
        for row in BQ_CLIENT.query(bq_query).result():
            bq_index[row.id] = row._fivetran_synced
    except Exception:
        pass  # Table may not exist

    # Step 3: Find records that need updating
    to_fetch = []

    # Records in API but not in BQ (new)
    for record_id in api_index:
        if record_id not in bq_index:
            to_fetch.append(record_id)

    # Records in API that are newer than BQ version
    for record_id, modified_time in api_index.items():
        if record_id in bq_index:
            # Compare modification times (API time may be more recent)
            if modified_time and modified_time > bq_index[record_id].isoformat():
                to_fetch.append(record_id)

    # Records in BQ but not in API (deleted in source)
    deleted_ids = set(bq_index.keys()) - set(api_index.keys())
    for record_id in deleted_ids:
        handle_delete(table_name, record_id)

    logger.info(
        f"Reconciliation: {len(to_fetch)} to fetch, "
        f"{len(deleted_ids)} deleted, "
        f"{len(api_index) - len(to_fetch) - len(deleted_ids)} unchanged"
    )

    # Step 4: Fetch and write updated records
    if to_fetch:
        full_records = []
        for record_id in to_fetch:
            try:
                record = api_request("GET", f"{detail_endpoint}/{record_id}")
                full_records.append(record)
            except Exception as e:
                logger.error(f"Failed to fetch {table_name} {record_id}: {e}")

        write_to_bigquery(table_name, full_records)

    return len(to_fetch)

The FastAPI Application

The Cloud Run service ties everything together:

"""main.py -- Cloud Run entry point."""

import os
import hmac
import logging
from datetime import datetime, timezone
from fastapi import FastAPI, Request, Response

from oauth import get_access_token
from sync import sync_table_incremental, sync_table_full, TABLE_CONFIGS
from webhook import handle_webhook, verify_webhook_signature, WEBHOOK_SECRET

app = FastAPI()
logger = logging.getLogger(__name__)

# Auth for scheduled triggers
SYNC_SECRET = os.environ.get("SYNC_SECRET", "")


@app.get("/")
async def health():
    """Health check."""
    return {"status": "ok", "tables": len(TABLE_CONFIGS)}


@app.post("/sync")
async def sync(request: Request):
    """Triggered by Cloud Scheduler. Runs incremental or full sync."""
    # Authenticate via shared secret
    provided_secret = request.headers.get("X-Sync-Secret", "")
    if not hmac.compare_digest(provided_secret, SYNC_SECRET):
        return Response(status_code=403, content="Forbidden")

    mode = request.query_params.get("mode", "incremental")
    results = {}

    for config in TABLE_CONFIGS:
        table_name = config["table"]
        try:
            if mode == "full":
                count = sync_table_full(config)
            else:
                count = sync_table_incremental(config)
            results[table_name] = {"status": "ok", "records": count}
        except Exception as e:
            logger.error(f"Sync failed for {table_name}: {e}")
            results[table_name] = {"status": "error", "error": str(e)}

    return {"mode": mode, "results": results}


@app.post("/webhook")
async def webhook_handler(request: Request):
    """Receives real-time events from the helpdesk platform."""
    return await handle_webhook(request)


# Table configurations
TABLE_CONFIGS = [
    {
        "table": "tickets",
        "entity_type": "ticket",
        "list_endpoint": "/tickets",
        "search_endpoint": "/tickets/search",
        "detail_endpoint": "/tickets",
    },
    {
        "table": "contacts",
        "entity_type": "contact",
        "list_endpoint": "/contacts",
        "search_endpoint": "/contacts/search",
        "detail_endpoint": "/contacts",
    },
    {
        "table": "threads",
        "entity_type": "thread",
        "list_endpoint": "/tickets/{ticket_id}/threads",
        "search_endpoint": None,  # threads synced via parent ticket
        "detail_endpoint": "/tickets/{ticket_id}/threads",
    },
    {
        "table": "agents",
        "entity_type": "agent",
        "list_endpoint": "/agents",
        "search_endpoint": None,
        "detail_endpoint": "/agents",
    },
    {
        "table": "departments",
        "entity_type": "department",
        "list_endpoint": "/departments",
        "search_endpoint": None,
        "detail_endpoint": "/departments",
    },
]

Cloud Scheduler Setup

Two scheduler jobs handle the automated syncs:

#!/bin/bash
# setup-scheduler.sh

PROJECT="my-gcp-project"
REGION="europe-west3"
SERVICE_URL="https://helpdesk-sync-abc123.europe-west3.run.app"
SYNC_SECRET=$(gcloud secrets versions access latest \
    --secret=helpdesk-sync-secret --project=$PROJECT)

# Daily incremental sync (catches missed webhooks)
gcloud scheduler jobs create http helpdesk-daily-sync \
    --project=$PROJECT \
    --location=$REGION \
    --schedule="0 5 * * *" \
    --time-zone="Europe/Amsterdam" \
    --uri="$SERVICE_URL/sync" \
    --http-method=POST \
    --headers="X-Sync-Secret=$SYNC_SECRET" \
    --oidc-service-account-email="scheduler-sa@$PROJECT.iam.gserviceaccount.com"

# Weekly full reconciliation (catches any drift)
gcloud scheduler jobs create http helpdesk-weekly-full-sync \
    --project=$PROJECT \
    --location=$REGION \
    --schedule="0 4 * * 0" \
    --time-zone="Europe/Amsterdam" \
    --uri="$SERVICE_URL/sync?mode=full" \
    --http-method=POST \
    --headers="X-Sync-Secret=$SYNC_SECRET" \
    --oidc-service-account-email="scheduler-sa@$PROJECT.iam.gserviceaccount.com"

Cold Start Handling

Cloud Run scales to zero by default. A webhook arriving when the service is cold leads to a 5-10 second cold start, which can cause the helpdesk platform to time out and retry. Retries are fine (our write logic is idempotent), but the timeout logs create noise.

I solved this with a pair of Cloud Scheduler jobs that toggle min-instances between 1 (during business hours) and 0 (nights and weekends):

# Wake up (business hours)
gcloud scheduler jobs create http helpdesk-sync-wake \
    --schedule="0 7 * * 1-5" \
    --time-zone="Europe/Amsterdam" \
    --uri="https://run.googleapis.com/v2/projects/$PROJECT/locations/$REGION/services/helpdesk-sync" \
    --http-method=POST \
    --headers="X-HTTP-Method-Override=PATCH,Content-Type=application/json" \
    --message-body='{"scaling":{"minInstanceCount":1}}' \
    --oauth-service-account-email="cloud-run-admin@$PROJECT.iam.gserviceaccount.com"

# Sleep (evenings)
gcloud scheduler jobs create http helpdesk-sync-sleep \
    --schedule="0 18 * * 1-5" \
    --time-zone="Europe/Amsterdam" \
    --uri="https://run.googleapis.com/v2/projects/$PROJECT/locations/$REGION/services/helpdesk-sync" \
    --http-method=POST \
    --headers="X-HTTP-Method-Override=PATCH,Content-Type=application/json" \
    --message-body='{"scaling":{"minInstanceCount":0}}' \
    --oauth-service-account-email="cloud-run-admin@$PROJECT.iam.gserviceaccount.com"

Note the X-HTTP-Method-Override: PATCH header — Cloud Scheduler does not natively support PATCH requests, so we use the override header with a POST.

The Migration Cutover

The actual migration followed this sequence:

  1. Deploy the service and run a full sync — populates the new dataset alongside the old one
  2. Enable webhooks on the helpdesk platform — new events flow into both the old connector and the new service
  3. Run comparison queries — verify record counts and content hashes match between old and new datasets for all 6 tables
  4. Update dbt source definition — point to the new dataset (the one-line YAML change shown earlier)
  5. Run dbt with full-refresh — rebuild all downstream models from the new source
  6. Disable the old managed connector — stop paying for it

Step 3 is the critical validation. I wrote a comparison script that checks active record counts, runs a full outer join on IDs, and compares content hashes for every matching record. Any mismatch would have been a blocker.

Results

  • Latency: Webhook events appear in BigQuery within 2-3 seconds (vs 15-60 minutes with the managed connector’s sync interval)
  • Reliability: Zero missed records in the first month (the weekly full reconciliation has found 0 discrepancies so far)
  • Cost: Cloud Run costs about 8 euros/month (min-instances during business hours). The managed connector was significantly more.
  • dbt changes: One line changed (dataset name in source YAML). Zero downstream model changes.

Lessons

  1. Match the CDC column naming exactly. If your dbt models expect _fivetran_active, give them _fivetran_active. Renaming columns to be “more correct” is not worth the cascade of downstream changes.
  2. Never trust webhook payloads as source of truth. Always re-fetch the full record from the API. Webhooks tell you what changed; the API tells you the current state.
  3. Row hashing gives you idempotency for free. Hash the record content, compare before writing. This handles duplicate webhooks, out-of-order delivery, and repeated syncs without any special logic.
  4. Full reconciliation is your safety net. Even with perfect webhook handling, run a scheduled full sync. It catches edge cases you have not thought of yet.
  5. Min-instance scheduling saves money and latency. Pay for a warm instance during working hours when webhooks are frequent. Scale to zero at night when support tickets are not being updated.
  6. Store OAuth refresh tokens in Secret Manager, not in environment variables. When a refresh token rotates, you need to persist the new one. Environment variables are immutable per Cloud Run revision. Secret Manager lets you update tokens without redeploying.

The pattern is generic enough that I have since used the same codebase structure to replace a second managed connector for CRM data. Same CDC column naming trick, same webhook + reconciliation architecture, same zero-dbt-change migration. If you are paying for a managed connector that syncs fewer than 10 tables, it is worth evaluating whether a custom Cloud Run service would be simpler and cheaper.

You may also like

Leave a Comment