Rohan Singh's Weblog

Writing about software, data science, and things I learn along the way.

12 Patterns I Learned Building an Agentic AI That Queries a 1,200-Table Data Warehouse

I'm building an AI assistant that takes plain English questions and answers them by searching a data catalog, inspecting table schemas, writing SQL, and returning results with charts. The backend is Python/FastAPI, the LLM is Claude on AWS Bedrock, and the data warehouse is Databricks with ~1,200 tables.

Over the last couple of days I hardened the agentic loop from a naive "call LLM, hope for the best" into something that's reliable, observable, and cheap to run. Here's everything I learned, with code.


1. The Core Pattern: A Bounded Tool-Use Loop

An "agent" is just a loop. You send a message to an LLM. If it wants to call a tool, you execute the tool, feed the result back, and repeat. If it's done, you return the answer.

for iteration in range(MAX_ITERATIONS):
    response = llm.converse(messages=messages, tools=tools)

    if response["stopReason"] != "tool_use":
        break  # model is done

    # Execute whatever tools the model requested
    tool_results = execute_tools(response)
    messages.append(response)       # assistant's tool_use message
    messages.append(tool_results)   # our tool_result message

The critical part: always cap iterations. Without MAX_ITERATIONS, a confused model loops forever, burning tokens and money. We cap at 20 and return a fallback message when exceeded.

MAX_TOOL_ITERATIONS = 20
FALLBACK_RESPONSE = "I ran into an issue processing your request. Please try rephrasing."

2. Force Structured Output With a Tool, Not Parsing

Instead of hoping the model returns JSON and then parsing it out of prose, define a respond_to_user tool that forces the model to return structured data:

{
    "name": "respond_to_user",
    "description": "Return your final answer to the user",
    "inputSchema": {
        "type": "object",
        "properties": {
            "content":       {"type": "string"},
            "visualization": {"type": "object"},
            "followUps":     {"type": "array"},
            "sql":           {"type": "string"}
        },
        "required": ["content"]
    }
}

The system prompt says: "You MUST call respond_to_user. Never write your final answer as plain text."

If the model stops without calling this tool, you return the fallback. No parsing, no regex, no "try to find the JSON in the text." The response either comes through the tool (guaranteed valid schema) or it doesn't come at all.

if stop_reason != "tool_use":
    # Model stopped without calling respond_to_user — return fallback
    return {"content": FALLBACK_RESPONSE}

3. Every tool_use Needs a tool_result

AWS Bedrock (and other LLM APIs) enforce strict message ordering: every tool_use block must have a matching tool_result in the next user message. Break this rule and you get a ValidationException.

This is easy to miss with respond_to_user. The model calls it, you extract the response, and you're done — but you still need to close the pair:

# Model called respond_to_user — we have our answer
final_response = extract_respond_to_user(output_message)

# But we STILL need to append the matching tool_result
messages.append({
    "role": "user",
    "content": [{
        "toolResult": {
            "toolUseId": tool_use_id,
            "content": [{"json": {"status": "delivered"}}],
        }
    }],
})

Rule: if you append an assistant message containing toolUse, you must append a user message containing the matching toolResult before anything else.


4. Make It Stateless Per Request

The first version had self.messages on a singleton orchestrator. Every user's messages accumulated into one shared list. User A's clinical notes query polluted User B's context. Token counts grew without bound.

Fix: make messages a local variable.

# BEFORE (broken) — messages persist across requests
class AgentOrchestrator:
    def __init__(self):
        self.messages = []  # shared across ALL users and requests

    async def process_message(self, user_message):
        self.messages.append({"role": "user", "content": user_message})
        # ... tokens grow forever

# AFTER (correct) — fresh messages per request
async def process_message(self, user_message):
    messages = [
        {"role": "user", "content": [{"text": user_message}]}
    ]
    # messages is local — garbage collected when method returns

Token usage dropped from ~181K to ~20-30K per request.


5. Truncate Tool Results Before Sending Back

A clinical note can be 13,000 characters. A query might return 200 rows. If you feed all of that back to the LLM, you blow out the context window and pay for tokens the model doesn't need.

Two levels of truncation:

MAX_RESULT_CHARS = 4000
MAX_FIELD_CHARS = 500

def _truncate_result(result):
    # Level 1: truncate individual long string fields
    if "rows" in result:
        for row in result["rows"]:
            for key, value in row.items():
                if isinstance(value, str) and len(value) > MAX_FIELD_CHARS:
                    row[key] = value[:MAX_FIELD_CHARS] + "... [truncated]"

    # Level 2: remove rows from the end until total size fits
    serialized = json.dumps(result, default=str)
    if len(serialized) > MAX_RESULT_CHARS and "rows" in result:
        result["truncated"] = True
        result["note"] = "Result truncated. Use aggregation queries for full data."
        while result["rows"] and len(json.dumps(result, default=str)) > MAX_RESULT_CHARS:
            result["rows"].pop()

    return result

The note field is key — it tells the model why data is missing and what to do about it (use GROUP BY / COUNT instead of selecting raw rows).


6. Strip Metadata Columns

Data warehouses have ETL audit columns (_creation_instant, _is_deleted, _is_inferred) that are noise for answering questions. Filter them before truncation so you don't waste your 4K char budget on junk:

def _strip_metadata_columns(result):
    if "columns" not in result or "rows" not in result:
        return result
    keep = [c for c in result["columns"] if not c.startswith("_")]
    result["columns"] = keep
    result["rows"] = [
        {k: v for k, v in row.items() if k in keep}
        for row in result["rows"]
    ]
    return result

7. Detect When the Agent Is Stuck

Models get stuck in failure loops — retrying the same broken SQL pattern over and over. Track consecutive failures and inject a nudge:

consecutive_query_failures = 0

# After each tool execution:
if query_failed:
    consecutive_query_failures += 1
else:
    consecutive_query_failures = 0

# When stuck:
if consecutive_query_failures >= 2:
    tool_results.append({
        "text": "You have failed 2 consecutive queries. Step back and "
                "reconsider your approach entirely — search for different "
                "tables, check schemas again, or simplify the query."
    })

This is injected as an extra text block alongside the tool results, so the model sees it in the same turn. It's more effective than hoping the model will self-correct.


8. Inject Learnings at Error Time, Not in the System Prompt

The agent has a save_learning tool that writes to a markdown file — SQL dialect quirks, column type gotchas, table relationships. First instinct: inject all learnings into the system prompt on every request.

Problem: learnings grow, system prompt grows, every request pays for tokens the model doesn't need.

Better: inject learnings only when a query fails — the moment they're actually useful:

except Exception as e:
    learnings = load_learnings()
    return {
        "error": str(e),
        "guidance": "Review these learnings for known patterns, then try a different approach.",
        "learnings": learnings if learnings else None,
    }

This is "progressive disclosure" — context only when relevant.


9. Sanitize Types at the Boundary

Database results contain Decimal, datetime, date — types that don't serialize to JSON. The LLM API rejects them. Sanitize everything before constructing tool results:

def _sanitize(obj):
    if isinstance(obj, Decimal):
        return float(obj)
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    if isinstance(obj, dict):
        return {k: _sanitize(v) for k, v in obj.items()}
    if isinstance(obj, list):
        return [_sanitize(v) for v in obj]
    return obj

# Apply at the boundary:
result = {"toolResult": {"content": [{"json": _sanitize(tool_output)}]}}

10. Run Tools in Parallel

When the model returns multiple tool calls in one response, run them concurrently:

async def _handle_tool_use(self, content_blocks):
    tool_uses = [
        block["toolUse"] for block in content_blocks
        if "toolUse" in block
    ]

    async def _run(tu):
        result = await asyncio.to_thread(
            self._execute_tool, tu["name"], tu["input"]
        )
        return {
            "toolResult": {
                "toolUseId": tu["toolUseId"],
                "content": [{"json": result}],
            }
        }

    results = await asyncio.gather(*[_run(tu) for tu in tool_uses])
    return list(results)

asyncio.to_thread wraps the blocking DB/API calls so they don't block the event loop. When the model calls search_tables and get_table_schema simultaneously, both run at once.


11. Add Per-Request Telemetry

You can't improve what you can't measure. Track iterations, tool calls, tokens, and wall-clock time:

class _Telemetry:
    def __init__(self):
        self.start_time = time.perf_counter()
        self.total_tokens_in = 0
        self.total_tokens_out = 0
        self.tool_counts = {}
        self.iterations = 0

    def record_iteration(self, response):
        self.iterations += 1
        usage = response.get("usage", {})
        self.total_tokens_in += usage.get("inputTokens", 0)
        self.total_tokens_out += usage.get("outputTokens", 0)

    def log_summary(self):
        elapsed = time.perf_counter() - self.start_time
        logger.info(
            "Request complete | iterations=%d | tools=%s | "
            "tokens_in=%d tokens_out=%d | elapsed=%.1fs",
            self.iterations, self.tool_counts,
            self.total_tokens_in, self.total_tokens_out, elapsed,
        )

Log output:

Request complete | iterations=4 | tools={search_tables: 1, get_table_schema: 2, query_data: 1} | tokens_in=12340 tokens_out=2100 | elapsed=3.2s

This immediately tells you when something is wrong — 15 iterations means the agent is lost, 200K tokens means results aren't being truncated.


12. Prompt Caching Saves 90% on Repeat Iterations

Within a single request, the system prompt and tool definitions are identical across iterations 1, 2, 3, etc. AWS Bedrock supports cachePoint markers so iterations 2+ read the cached prefix at 10% of the input cost:

# In the bedrock client:
kwargs["system"] = [
    {"text": system_prompt},
    {"cachePoint": {"type": "default"}},
]

tools = tool_definitions + [{"cachePoint": {"type": "default"}}]
kwargs["toolConfig"] = {"tools": tools}

The system prompt + tool definitions exceed the 1,024-token minimum threshold for caching. This is free money — no code change to the agent logic, just two markers in the API call.


The Checklist

Pattern Why It Matters
Bounded loop with fallback Prevents runaway token spend
Structured output via tool Guarantees parseable responses
tool_result for every tool_use Keeps conversation history valid
Stateless per request Prevents cross-user pollution, bounds tokens
Result truncation Protects context window
Strip metadata columns Don't waste token budget on noise
Stuck detection + nudge Breaks failure loops early
Learnings at error time Context only when relevant
Type sanitization Prevents serialization crashes
Parallel tool execution Reduces latency
Per-request telemetry Enables diagnosis and optimization
Prompt caching 90% cost reduction on repeat iterations

What I'd Do Differently

Start stateless. The cross-request accumulation bug was obvious in hindsight. If your orchestrator is a singleton (which it should be for a FastAPI app), conversation state cannot live on the instance.

Truncate from day one. We discovered the 181K token problem only after looking at the Bedrock bill. If we'd added _truncate_result on day one, the bill would have been a fraction.

Log telemetry before optimizing. The telemetry class was the first thing that actually showed us where tokens were going. Without it, we were guessing.

All the code above is from one file — orchestrator.py — running in production at a cancer research hospital. The patterns are general enough to apply to any agentic tool-use loop, regardless of the LLM provider or domain.

← Back to posts