Home Data EngineeringBuilding an AI-Powered Data Assistant for the Team

Building an AI-Powered Data Assistant for the Team

by Marc

Our data team of four was drowning in ad-hoc requests. “What was the revenue last month?” “How many active customers do we have in Germany?” “What’s the conversion rate for Q4?” Each question meant someone opening a SQL editor, writing a query, formatting the results, and posting them in Slack. So I built an AI-powered data assistant: a Slack bot backed by Claude and BigQuery that lets anyone on the team ask business questions in natural language and get SQL-backed answers in seconds.

Architecture Overview

The bot runs on Cloud Run as a single Python service. The stack is slack-bolt for Slack event handling, FastAPI for HTTP endpoints (Cloud Run needs an HTTP server, not a WebSocket), anthropic SDK for Claude API calls, and google-cloud-bigquery for running queries. In development mode, it switches to Socket Mode for local testing without a public URL.

The flow: a user mentions the bot in Slack or sends it a direct message. The bot receives the event, constructs a system prompt with schema knowledge and business context, calls Claude with tool-use enabled, and Claude generates SQL. The bot executes that SQL against BigQuery (read-only), formats the results, and posts them back to Slack. If Claude needs to refine the query, it can call the BigQuery tool multiple times in a loop.

The Schema Problem: 7,075 Columns

The data warehouse has 197 tables across 5 schemas, totalling 7,075 columns. My first approach was to embed the full schema in the system prompt. That worked, but cost about 22,000 tokens per request just for schema context — before the user even asked a question. At hundreds of requests per week, this was burning through the API budget fast.

The solution was a runtime lookup_schema tool. Instead of embedding all columns in the prompt, I generate a JSON schema index at deploy time and let Claude search it on demand. This dropped the per-request token cost from ~22K to ~14K (the system prompt still includes business rules and query patterns, but not the full column listing).

Schema Index Generation

"""generate_schema.py — Build schema index from BigQuery INFORMATION_SCHEMA.

Run at deploy time. Queries all production schemas, merges BI tool
descriptions where available, writes schema_index.json.
"""

from google.cloud import bigquery
import json

SCHEMAS = [
    "dwh_analytics",
    "dwh_analytics_bizlogic",
    "dwh_reporting",
    "dwh",
    "dwh_analytics_views",
]
PROJECT = "production-project"

def build_schema_index():
    client = bigquery.Client()
    index = {}

    for schema in SCHEMAS:
        query = f"""
            SELECT table_name, column_name, data_type, description
            FROM `{PROJECT}.{schema}.INFORMATION_SCHEMA.COLUMNS`
            ORDER BY table_name, ordinal_position
        """
        rows = client.query(query).result()

        for row in rows:
            table = row.table_name
            if table not in index:
                index[table] = {
                    "schema": schema,
                    "columns": []
                }
            index[table]["columns"].append({
                "name": row.column_name,
                "type": row.data_type,
                "description": row.description or ""
            })

    # Merge BI tool descriptions if available
    try:
        with open("aml_descriptions.json") as f:
            aml = json.load(f)
        for table, desc_map in aml.items():
            if table in index:
                for col in index[table]["columns"]:
                    if col["name"] in desc_map:
                        col["description"] = desc_map[col["name"]]
    except FileNotFoundError:
        pass

    with open("schema_index.json", "w") as f:
        json.dump(index, f, indent=2)

    total_cols = sum(len(t["columns"]) for t in index.values())
    print(f"Index: {len(index)} tables, {total_cols} columns")

if __name__ == "__main__":
    build_schema_index()

The bot then exposes a lookup_schema tool that Claude can call. It accepts a search term (table name, column name, or keyword) and returns matching tables and columns. Claude typically calls it once or twice per question, pulling in only the columns it needs.

Streaming Responses

Nobody wants to stare at a blank Slack message for 15 seconds. The bot uses Claude’s streaming API with an on_progress callback that updates the Slack message every 1.5 seconds as text is generated. During tool execution (BigQuery queries), it shows a status indicator so the user knows the bot is working.

The Core Loop

The call_claude function is the heart of the bot. It manages a tool-use loop: send a message to Claude, check if Claude wants to use a tool (BigQuery query or schema lookup), execute the tool, feed the result back, and repeat until Claude produces a final text response.

async def call_claude(question: str, conversation: list, request_id: str):
    """Call Claude with tool-use loop. Returns final response text."""

    messages = conversation + [{"role": "user", "content": question}]
    tools = [BIGQUERY_TOOL, LOOKUP_SCHEMA_TOOL]
    total_tokens = {"input": 0, "output": 0}
    tool_calls = []

    for iteration in range(MAX_TOOL_ITERATIONS):  # safety limit: 8
        response = client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=4096,
            system=build_system_prompt(),
            messages=messages,
            tools=tools,
        )

        total_tokens["input"] += response.usage.input_tokens
        total_tokens["output"] += response.usage.output_tokens

        # Check if Claude wants to use a tool
        tool_use_blocks = [
            b for b in response.content if b.type == "tool_use"
        ]

        if not tool_use_blocks:
            # Final text response — extract and return
            text = next(
                (b.text for b in response.content if b.type == "text"), ""
            )
            log_usage(request_id, total_tokens, tool_calls)
            return text

        # Execute each tool call
        tool_results = []
        for block in tool_use_blocks:
            tool_calls.append(block.name)

            if block.name == "run_bigquery":
                result = await execute_bigquery(
                    block.input["sql"], request_id
                )
            elif block.name == "lookup_schema":
                result = search_schema_index(block.input["search_term"])
            else:
                result = {"error": f"Unknown tool: {block.name}"}

            tool_results.append({
                "type": "tool_result",
                "tool_use_id": block.id,
                "content": json.dumps(result)[:MAX_TOOL_RESULT_BYTES],
            })

        # Feed tool results back to Claude
        messages.append({"role": "assistant", "content": response.content})
        messages.append({"role": "user", "content": tool_results})

    return "I hit the maximum number of tool calls. Please simplify your question."

SQL Validation: The Security Layer

Letting an LLM generate SQL that runs against your production database is terrifying. The bot has a strict validation layer that runs before every query reaches BigQuery.

import re

ALLOWED_SCHEMAS = [
    "dwh_analytics",
    "dwh_analytics_bizlogic",
    "dwh_reporting",
    "dwh",
    "dwh_analytics_views",
]

# Patterns that should never appear in a read-only query
BLOCKED_PATTERNS = [
    r'\bINSERT\b', r'\bUPDATE\b', r'\bDELETE\b', r'\bDROP\b',
    r'\bCREATE\b', r'\bALTER\b', r'\bTRUNCATE\b', r'\bMERGE\b',
    r'\bGRANT\b', r'\bREVOKE\b', r'\bEXEC\b', r'\bCALL\b',
    r'\bDECLARE\b', r'\bSET\b(?!\s+@@)',  # allow SET for BQ options
]

MAX_BYTES = 10_000_000  # 10MB result limit

def validate_sql(sql: str) -> tuple[bool, str]:
    """Validate SQL before execution. Returns (ok, error_message)."""
    sql_upper = sql.upper().strip()

    # Must be a SELECT or WITH statement
    if not (sql_upper.startswith("SELECT") or sql_upper.startswith("WITH")):
        return False, "Only SELECT queries are allowed."

    # Check for DDL/DML keywords
    for pattern in BLOCKED_PATTERNS:
        if re.search(pattern, sql_upper):
            return False, f"Blocked SQL pattern detected: {pattern}"

    # Verify all referenced tables are in allowed schemas
    # Extract schema references from fully-qualified table names
    table_refs = re.findall(
        r'`[^`]+\.([^`]+)\.[^`]+`', sql
    )
    for schema in table_refs:
        if schema not in ALLOWED_SCHEMAS:
            return False, f"Access denied to schema: {schema}"

    return True, ""

The Corrections File

LLMs make mistakes. The same mistakes, repeatedly. After watching the bot get certain metrics wrong over and over, I created a corrections file — a high-priority override document that gets loaded before the general knowledge base in the system prompt. It contains the 30 most-commonly-asked metrics with their exact SQL patterns, deprecated field warnings, and default filter rules. When someone asks about revenue, the bot no longer guesses at the SQL; it follows the correction that says “revenue requires these 4 components, no stage filter, use this exact CASE statement.”

This corrections file is editable without touching the main knowledge base. Quick fixes go there first, and they take effect on the next deploy without rewriting the system prompt.

Usage Logging

Every API call is logged to a BigQuery table with: request ID, user ID, model name, input/output tokens, estimated cost, the full request and response, which tools were called, duration, and whether it succeeded. A separate query log table captures every BigQuery and schema lookup call with its own request ID (one Claude call can trigger many BQ queries). This gives full observability into what the bot is doing, how much it costs, and where it makes mistakes.

Security Smoke Tests

Every deploy runs 11 automated security tests: SQL injection attempts, DDL/DML blocking, schema access restrictions, prompt injection detection, cron endpoint authentication enforcement, secret leak detection in responses, BigQuery byte limits, timing-safe comparison verification, user allowlist validation, and schema index loading checks. These run as a /smoke-test endpoint that the deploy script hits before routing traffic to the new revision.

Results

  • 86 out of 100 automated test questions answered correctly (manual evaluation against known-good SQL)
  • 29 key dashboard metrics verified with zero differences against the BI tool
  • Average response time: 8-12 seconds for simple queries, 15-25 for complex multi-step ones
  • Token cost: ~14K tokens per request (down from 22K before the schema index optimisation)
  • The bot handles about 50-80 questions per week from a team of 15 people

The biggest lesson: the corrections file matters more than the model choice. Upgrading from a cheaper model to a smarter one improved accuracy by maybe 10%. The corrections file — documenting the exact SQL for the 30 most-asked metrics — improved it by another 25%. Domain knowledge beats model size every time.

You may also like

Leave a Comment