Our data team of four was drowning in ad-hoc requests. “What was the revenue last month?” “How many active customers do we have in Germany?” “What’s the conversion rate for Q4?” Each question meant someone opening a SQL editor, writing a query, formatting the results, and posting them in Slack. So I built an AI-powered data assistant: a Slack bot backed by Claude and BigQuery that lets anyone on the team ask business questions in natural language and get SQL-backed answers in seconds.
Architecture Overview
The bot runs on Cloud Run as a single Python service. The stack is slack-bolt for Slack event handling, FastAPI for HTTP endpoints (Cloud Run needs an HTTP server, not a WebSocket), anthropic SDK for Claude API calls, and google-cloud-bigquery for running queries. In development mode, it switches to Socket Mode for local testing without a public URL.
The flow: a user mentions the bot in Slack or sends it a direct message. The bot receives the event, constructs a system prompt with schema knowledge and business context, calls Claude with tool-use enabled, and Claude generates SQL. The bot executes that SQL against BigQuery (read-only), formats the results, and posts them back to Slack. If Claude needs to refine the query, it can call the BigQuery tool multiple times in a loop.
The Schema Problem: 7,075 Columns
The data warehouse has 197 tables across 5 schemas, totalling 7,075 columns. My first approach was to embed the full schema in the system prompt. That worked, but cost about 22,000 tokens per request just for schema context — before the user even asked a question. At hundreds of requests per week, this was burning through the API budget fast.
The solution was a runtime lookup_schema tool. Instead of embedding all columns in the prompt, I generate a JSON schema index at deploy time and let Claude search it on demand. This dropped the per-request token cost from ~22K to ~14K (the system prompt still includes business rules and query patterns, but not the full column listing).
Schema Index Generation
"""generate_schema.py — Build schema index from BigQuery INFORMATION_SCHEMA.
Run at deploy time. Queries all production schemas, merges BI tool
descriptions where available, writes schema_index.json.
"""
from google.cloud import bigquery
import json
SCHEMAS = [
"dwh_analytics",
"dwh_analytics_bizlogic",
"dwh_reporting",
"dwh",
"dwh_analytics_views",
]
PROJECT = "production-project"
def build_schema_index():
client = bigquery.Client()
index = {}
for schema in SCHEMAS:
query = f"""
SELECT table_name, column_name, data_type, description
FROM `{PROJECT}.{schema}.INFORMATION_SCHEMA.COLUMNS`
ORDER BY table_name, ordinal_position
"""
rows = client.query(query).result()
for row in rows:
table = row.table_name
if table not in index:
index[table] = {
"schema": schema,
"columns": []
}
index[table]["columns"].append({
"name": row.column_name,
"type": row.data_type,
"description": row.description or ""
})
# Merge BI tool descriptions if available
try:
with open("aml_descriptions.json") as f:
aml = json.load(f)
for table, desc_map in aml.items():
if table in index:
for col in index[table]["columns"]:
if col["name"] in desc_map:
col["description"] = desc_map[col["name"]]
except FileNotFoundError:
pass
with open("schema_index.json", "w") as f:
json.dump(index, f, indent=2)
total_cols = sum(len(t["columns"]) for t in index.values())
print(f"Index: {len(index)} tables, {total_cols} columns")
if __name__ == "__main__":
build_schema_index()
The bot then exposes a lookup_schema tool that Claude can call. It accepts a search term (table name, column name, or keyword) and returns matching tables and columns. Claude typically calls it once or twice per question, pulling in only the columns it needs.
Streaming Responses
Nobody wants to stare at a blank Slack message for 15 seconds. The bot uses Claude’s streaming API with an on_progress callback that updates the Slack message every 1.5 seconds as text is generated. During tool execution (BigQuery queries), it shows a status indicator so the user knows the bot is working.
The Core Loop
The call_claude function is the heart of the bot. It manages a tool-use loop: send a message to Claude, check if Claude wants to use a tool (BigQuery query or schema lookup), execute the tool, feed the result back, and repeat until Claude produces a final text response.
async def call_claude(question: str, conversation: list, request_id: str):
"""Call Claude with tool-use loop. Returns final response text."""
messages = conversation + [{"role": "user", "content": question}]
tools = [BIGQUERY_TOOL, LOOKUP_SCHEMA_TOOL]
total_tokens = {"input": 0, "output": 0}
tool_calls = []
for iteration in range(MAX_TOOL_ITERATIONS): # safety limit: 8
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
system=build_system_prompt(),
messages=messages,
tools=tools,
)
total_tokens["input"] += response.usage.input_tokens
total_tokens["output"] += response.usage.output_tokens
# Check if Claude wants to use a tool
tool_use_blocks = [
b for b in response.content if b.type == "tool_use"
]
if not tool_use_blocks:
# Final text response — extract and return
text = next(
(b.text for b in response.content if b.type == "text"), ""
)
log_usage(request_id, total_tokens, tool_calls)
return text
# Execute each tool call
tool_results = []
for block in tool_use_blocks:
tool_calls.append(block.name)
if block.name == "run_bigquery":
result = await execute_bigquery(
block.input["sql"], request_id
)
elif block.name == "lookup_schema":
result = search_schema_index(block.input["search_term"])
else:
result = {"error": f"Unknown tool: {block.name}"}
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result)[:MAX_TOOL_RESULT_BYTES],
})
# Feed tool results back to Claude
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
return "I hit the maximum number of tool calls. Please simplify your question."
SQL Validation: The Security Layer
Letting an LLM generate SQL that runs against your production database is terrifying. The bot has a strict validation layer that runs before every query reaches BigQuery.
import re
ALLOWED_SCHEMAS = [
"dwh_analytics",
"dwh_analytics_bizlogic",
"dwh_reporting",
"dwh",
"dwh_analytics_views",
]
# Patterns that should never appear in a read-only query
BLOCKED_PATTERNS = [
r'\bINSERT\b', r'\bUPDATE\b', r'\bDELETE\b', r'\bDROP\b',
r'\bCREATE\b', r'\bALTER\b', r'\bTRUNCATE\b', r'\bMERGE\b',
r'\bGRANT\b', r'\bREVOKE\b', r'\bEXEC\b', r'\bCALL\b',
r'\bDECLARE\b', r'\bSET\b(?!\s+@@)', # allow SET for BQ options
]
MAX_BYTES = 10_000_000 # 10MB result limit
def validate_sql(sql: str) -> tuple[bool, str]:
"""Validate SQL before execution. Returns (ok, error_message)."""
sql_upper = sql.upper().strip()
# Must be a SELECT or WITH statement
if not (sql_upper.startswith("SELECT") or sql_upper.startswith("WITH")):
return False, "Only SELECT queries are allowed."
# Check for DDL/DML keywords
for pattern in BLOCKED_PATTERNS:
if re.search(pattern, sql_upper):
return False, f"Blocked SQL pattern detected: {pattern}"
# Verify all referenced tables are in allowed schemas
# Extract schema references from fully-qualified table names
table_refs = re.findall(
r'`[^`]+\.([^`]+)\.[^`]+`', sql
)
for schema in table_refs:
if schema not in ALLOWED_SCHEMAS:
return False, f"Access denied to schema: {schema}"
return True, ""
The Corrections File
LLMs make mistakes. The same mistakes, repeatedly. After watching the bot get certain metrics wrong over and over, I created a corrections file — a high-priority override document that gets loaded before the general knowledge base in the system prompt. It contains the 30 most-commonly-asked metrics with their exact SQL patterns, deprecated field warnings, and default filter rules. When someone asks about revenue, the bot no longer guesses at the SQL; it follows the correction that says “revenue requires these 4 components, no stage filter, use this exact CASE statement.”
This corrections file is editable without touching the main knowledge base. Quick fixes go there first, and they take effect on the next deploy without rewriting the system prompt.
Usage Logging
Every API call is logged to a BigQuery table with: request ID, user ID, model name, input/output tokens, estimated cost, the full request and response, which tools were called, duration, and whether it succeeded. A separate query log table captures every BigQuery and schema lookup call with its own request ID (one Claude call can trigger many BQ queries). This gives full observability into what the bot is doing, how much it costs, and where it makes mistakes.
Security Smoke Tests
Every deploy runs 11 automated security tests: SQL injection attempts, DDL/DML blocking, schema access restrictions, prompt injection detection, cron endpoint authentication enforcement, secret leak detection in responses, BigQuery byte limits, timing-safe comparison verification, user allowlist validation, and schema index loading checks. These run as a /smoke-test endpoint that the deploy script hits before routing traffic to the new revision.
Results
- 86 out of 100 automated test questions answered correctly (manual evaluation against known-good SQL)
- 29 key dashboard metrics verified with zero differences against the BI tool
- Average response time: 8-12 seconds for simple queries, 15-25 for complex multi-step ones
- Token cost: ~14K tokens per request (down from 22K before the schema index optimisation)
- The bot handles about 50-80 questions per week from a team of 15 people
The biggest lesson: the corrections file matters more than the model choice. Upgrading from a cheaper model to a smarter one improved accuracy by maybe 10%. The corrections file — documenting the exact SQL for the 30 most-asked metrics — improved it by another 25%. Domain knowledge beats model size every time.
