Home Data EngineeringIncremental Sync: From 35 Minutes to 5 Seconds

Incremental Sync: From 35 Minutes to 5 Seconds

by Marc

Our helpdesk ticket sync to BigQuery was broken. The Cloud Function responsible for it was hitting a 504 gateway timeout after 900 seconds — and the root cause was simple: it was listing every single ticket via the API on every run. With 52,000+ tickets and a page size of 50, that’s over 1,040 API calls before any data lands in BigQuery. Around 35 minutes of sequential HTTP requests, every single time.

The Original Architecture

The sync function worked in three phases:

  1. List all tickets — paginate through the entire helpdesk API (50 tickets per page, ~1,040 pages)
  2. Fetch details — for each ticket, call the detail endpoint to get all fields
  3. Load to BigQuery — write the results to a table using WRITE_TRUNCATE

Phase 1 alone took about 35 minutes. The function had a 900-second timeout, so it never completed. Even if we raised the timeout, 35+ minutes for a sync that should run every few hours is absurd.

The Fix: Incremental Sync

The helpdesk API has a search endpoint that supports a modifiedTimeRange parameter. Instead of listing everything, we can ask: “give me only tickets modified since the last time I synced.” For a sync running every 2 hours, that’s typically 10-50 tickets instead of 52,000.

Getting the High Watermark from BigQuery

First, we need to know when the last sync happened. I query BigQuery for the most recent modifiedTime across all synced tickets:

from google.cloud import bigquery
from datetime import datetime, timezone, timedelta

def get_last_modified_time(bq_client, table_id):
    """Get the most recent modifiedTime from the BigQuery table."""
    query = f"""
        SELECT MAX(modifiedTime) as last_modified
        FROM `{table_id}`
    """
    result = list(bq_client.query(query).result())

    if result and result[0].last_modified:
        return result[0].last_modified
    else:
        # No data yet — fall back to full sync
        return None

The Incremental Fetch

Here’s the core incremental sync logic. The search API accepts an ISO 8601 timestamp range and returns only matching tickets:

import requests
import logging

logger = logging.getLogger(__name__)

API_BASE = "https://api.helpdesk-provider.com/v2"
PAGE_SIZE = 100  # increased from 50

def fetch_tickets_incremental(api_key, since_timestamp):
    """
    Fetch only tickets modified since the given timestamp.
    Uses the search API with modifiedTimeRange filter.
    Falls back to full listing on any search API error.
    """
    try:
        return _search_modified_tickets(api_key, since_timestamp)
    except Exception as e:
        logger.warning(
            f"Search API failed ({e}), falling back to full listing"
        )
        return _list_all_tickets(api_key)


def _search_modified_tickets(api_key, since_timestamp):
    """
    Use the search endpoint with modifiedTimeRange to get
    only recently changed tickets.
    """
    headers = {"Authorization": f"Bearer {api_key}"}

    # Add a small overlap buffer to avoid missing edge cases
    # (tickets modified in the same second as our watermark)
    buffer = timedelta(minutes=5)
    since_str = (since_timestamp - buffer).strftime("%Y-%m-%dT%H:%M:%SZ")
    now_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

    all_tickets = []
    page_token = None

    while True:
        params = {
            "modifiedTimeRange": f"{since_str},{now_str}",
            "sortBy": "modifiedTime",
            "pageSize": PAGE_SIZE,
        }
        if page_token:
            params["pageToken"] = page_token

        response = requests.get(
            f"{API_BASE}/tickets/search",
            headers=headers,
            params=params,
            timeout=30,
        )
        response.raise_for_status()
        data = response.json()

        tickets = data.get("tickets", [])
        all_tickets.extend(tickets)
        logger.info(
            f"Fetched page with {len(tickets)} tickets "
            f"(total so far: {len(all_tickets)})"
        )

        page_token = data.get("nextPageToken")
        if not page_token:
            break

    logger.info(
        f"Incremental sync complete: {len(all_tickets)} tickets "
        f"modified since {since_str}"
    )
    return all_tickets


def _list_all_tickets(api_key):
    """
    Full listing fallback — paginate through all tickets.
    Used when search API is unavailable or errors.
    """
    headers = {"Authorization": f"Bearer {api_key}"}
    all_tickets = []
    page_token = None

    while True:
        params = {"pageSize": PAGE_SIZE}
        if page_token:
            params["pageToken"] = page_token

        response = requests.get(
            f"{API_BASE}/tickets",
            headers=headers,
            params=params,
            timeout=30,
        )
        response.raise_for_status()
        data = response.json()

        tickets = data.get("tickets", [])
        all_tickets.extend(tickets)

        page_token = data.get("nextPageToken")
        if not page_token:
            break

    logger.info(f"Full sync complete: {len(all_tickets)} tickets")
    return all_tickets

Detecting Missed Updates (Webhook Gap Detection)

We also have webhooks pushing real-time updates, but webhooks can fail silently. The incremental sync doubles as a safety net. After fetching modified tickets, I compare them against what’s already in BigQuery to detect any that were missed:

def detect_missed_tickets(bq_client, table_id, fetched_tickets):
    """
    Compare fetched tickets against BigQuery to find any
    where the API's modifiedTime is newer than what we have stored.
    These are tickets that webhooks may have missed.
    """
    if not fetched_tickets:
        return []

    ticket_ids = [t["id"] for t in fetched_tickets]

    # Build a temp lookup of what we fetched
    fetched_lookup = {
        t["id"]: t["modifiedTime"] for t in fetched_tickets
    }

    # Query BQ for current state of these tickets
    id_list = ", ".join(f"'{tid}'" for tid in ticket_ids)
    query = f"""
        SELECT ticket_id, modifiedTime
        FROM `{table_id}`
        WHERE ticket_id IN ({id_list})
    """
    bq_rows = {
        row.ticket_id: row.modifiedTime
        for row in bq_client.query(query).result()
    }

    missed = []
    for ticket_id, api_modified in fetched_lookup.items():
        api_ts = datetime.fromisoformat(
            api_modified.replace("Z", "+00:00")
        )
        bq_ts = bq_rows.get(ticket_id)

        if bq_ts is None:
            # Ticket exists in API but not in BQ at all
            missed.append(ticket_id)
            logger.warning(f"Ticket {ticket_id} missing from BQ entirely")
        elif api_ts > bq_ts:
            missed.append(ticket_id)
            logger.warning(
                f"Ticket {ticket_id} stale in BQ: "
                f"API={api_ts}, BQ={bq_ts}"
            )

    if missed:
        logger.warning(
            f"Detected {len(missed)} tickets with missed updates"
        )
    return missed

The Merge (Not Truncate)

With incremental sync, we can’t use WRITE_TRUNCATE anymore — we need to merge updated tickets into the existing table. I use a BigQuery MERGE statement:

def upsert_tickets(bq_client, table_id, tickets):
    """
    Merge fetched tickets into BigQuery using a temp table + MERGE.
    """
    if not tickets:
        logger.info("No tickets to upsert")
        return

    # Load tickets into a temp table
    temp_table = f"{table_id}_staging"
    job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_TRUNCATE",
        source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
        autodetect=False,
        schema=TICKET_SCHEMA,  # pre-defined schema
    )
    load_job = bq_client.load_table_from_json(
        tickets, temp_table, job_config=job_config
    )
    load_job.result()

    # Merge from staging into main table
    merge_query = f"""
        MERGE `{table_id}` AS target
        USING `{temp_table}` AS source
        ON target.ticket_id = source.ticket_id
        WHEN MATCHED THEN
            UPDATE SET
                subject = source.subject,
                status = source.status,
                priority = source.priority,
                assignee = source.assignee,
                modifiedTime = source.modifiedTime,
                raw_json = source.raw_json
        WHEN NOT MATCHED THEN
            INSERT (ticket_id, subject, status, priority,
                    assignee, modifiedTime, raw_json)
            VALUES (source.ticket_id, source.subject, source.status,
                    source.priority, source.assignee,
                    source.modifiedTime, source.raw_json)
    """
    bq_client.query(merge_query).result()
    logger.info(f"Merged {len(tickets)} tickets into {table_id}")

Putting It All Together

def sync_tickets(request):
    """Cloud Function entry point."""
    bq_client = bigquery.Client()
    table_id = "my-project.helpdesk_data.tickets"

    # Step 1: Get high watermark
    last_modified = get_last_modified_time(bq_client, table_id)

    if last_modified:
        logger.info(f"Incremental sync from {last_modified}")
        tickets = fetch_tickets_incremental(API_KEY, last_modified)
    else:
        logger.info("No watermark found — full sync")
        tickets = _list_all_tickets(API_KEY)

    # Step 2: Detect webhook misses
    missed = detect_missed_tickets(bq_client, table_id, tickets)
    if missed:
        logger.warning(f"Will update {len(missed)} missed tickets")

    # Step 3: Upsert to BigQuery
    upsert_tickets(bq_client, table_id, tickets)

    return {
        "status": "ok",
        "tickets_synced": len(tickets),
        "missed_updates_detected": len(missed),
        "mode": "incremental" if last_modified else "full",
    }

The Results

The numbers speak for themselves:

  • Full sync (before): ~1,040 API calls, ~35 minutes, frequent 504 timeouts
  • Incremental sync (after): 1-5 API calls, under 5 seconds, zero timeouts
  • Page size increase (50 to 100): halved the number of pages even for the full-sync fallback
  • Webhook gap detection: caught 3 missed updates in the first week alone

Design Decisions Worth Noting

The 5-Minute Overlap Buffer

The timedelta(minutes=5) buffer on the watermark is deliberate. If the last sync recorded a ticket modified at 14:00:00, and another ticket was also modified at 14:00:00 but arrived a millisecond after our query, we’d miss it. The 5-minute overlap means we might re-fetch a few tickets we already have, but the MERGE handles deduplication. It’s a cheap insurance policy.

Robust Fallback, Not Optimistic Retry

When the search API fails, I fall back to the full listing rather than retrying the search. This is intentional. The search endpoint has different rate limits and availability characteristics than the list endpoint. If search is having issues, retrying it won’t help — but the list endpoint might be fine. The full sync is slow but reliable. Worst case, it times out and the next run tries incremental again.

Why MERGE Instead of WRITE_TRUNCATE

The original full sync could afford WRITE_TRUNCATE because it always had the complete dataset. With incremental sync, we only have the delta. A MERGE (upsert) is the natural fit: update existing rows, insert new ones. The staging table pattern keeps the MERGE atomic — if the load to staging fails, the main table is untouched.

Takeaways

  • Always check if the API supports filtering by modification date. Most mature APIs do. It’s the single biggest optimisation you can make for sync jobs.
  • Build the fallback first. I kept the full listing code intact and wrapped it as the fallback path. This meant I could ship the incremental version with confidence — if anything went wrong, it would gracefully degrade.
  • Webhooks are not reliable enough to be your only sync mechanism. Use them for low-latency updates, but always have a polling-based reconciliation job that catches what webhooks miss.
  • Small page size defaults are a silent killer. The API defaulted to 50 items per page. Simply changing to 100 halved the number of round trips for any pagination-heavy operation. Always check the maximum allowed page size.

What was a 35-minute timeout nightmare is now a 5-second non-event. The function runs every 2 hours, processes a handful of tickets, and finishes before you’d notice it started.

You may also like

Leave a Comment