Chain of Thoughts and Progress Tracking¶
Ask RITA provides two observability mechanisms: Chain of Thoughts (CoT) for detailed step-by-step reasoning traces, and Progress Callbacks for lightweight step status updates.
Table of Contents¶
- Overview
- Chain of Thoughts
- Progress Callbacks
- Configuration
- Usage Examples
- API Reference
- How It Works
- Troubleshooting
Overview¶
| Mechanism | Purpose | Data Shape | Consumer |
|---|---|---|---|
| CoT Listeners | Detailed step reasoning, timing, confidence | Event dicts with step details | SSE / WebSocket / UI streaming |
query_with_cot() |
Typed CoT output with reasoning summary | ChainOfThoughtsOutput Pydantic model |
API responses |
WorkflowState.chain_of_thoughts |
Full step trace attached to result | Dict with summary + detailed steps | Post-query analysis |
| Progress Callbacks | Lightweight step status updates | ProgressData dataclass |
Simple progress UIs / console |
Chain of Thoughts¶
Enabling CoT¶
CoT tracking is enabled by default. Configure it under the chain_of_thoughts key:
chain_of_thoughts:
enabled: true
include_timing: true
include_confidence: true
include_step_details: false # Advanced: shows detailed step data
track_retries: true
max_reasoning_length: 500 # Max characters for reasoning text
CoT Listeners¶
Register a listener to receive real-time events as each workflow step completes:
from askrita import SQLAgentWorkflow, ConfigManager
config = ConfigManager("config.yaml")
workflow = SQLAgentWorkflow(config)
def cot_listener(event):
event_type = event["event_type"]
if event_type == "cot_step_completed":
step = event["cot_step"]
print(f"Step: {step['step_name']}")
print(f" Status: {step['status']}")
print(f" Duration: {step.get('duration_ms', 'N/A')}ms")
print(f" Reasoning: {step.get('reasoning', '')}")
elif event_type == "cot_workflow_completed":
summary = event["summary"]
print(f"\nWorkflow complete: {summary['overall_success']}")
print(f"Total steps: {summary['total_steps']}")
workflow.register_cot_listener(cot_listener)
result = workflow.query("What are total sales by region?")
Event Types¶
cot_step_completed — Emitted after each workflow step:
| Key | Type | Description |
|---|---|---|
event_type |
str |
"cot_step_completed" |
step_name |
str |
Name of the step |
cot_step |
dict |
Full step details (see below) |
details |
dict |
Step-specific metadata |
progress_data |
dict |
Progress data for the step |
cot_workflow_completed — Emitted when the workflow finishes:
| Key | Type | Description |
|---|---|---|
event_type |
str |
"cot_workflow_completed" |
summary |
dict |
Overall summary with step list |
detailed_steps |
list |
All step details |
workflow_id |
str |
Unique workflow run ID |
CoT Step Fields¶
Each step in the trace contains:
| Field | Type | Description |
|---|---|---|
step_name |
str |
Step identifier (e.g., "generate_sql") |
step_type |
str |
Category (e.g., "llm", "database", "validation") |
status |
str |
"started", "completed", "failed", or "skipped" |
start_time |
float |
Start timestamp |
end_time |
float |
End timestamp |
duration_ms |
float |
Elapsed time in milliseconds |
reasoning |
str |
What happened in this step |
input_summary |
str |
Summary of step input |
output_summary |
str |
Summary of step output |
error_message |
str |
Error details (if failed) |
retry_count |
int |
Number of retries |
confidence_score |
float |
Confidence score (0.0–1.0) |
details |
dict |
Step-specific metadata |
Listener Management¶
# Register
workflow.register_cot_listener(my_listener)
# Unregister
workflow.unregister_cot_listener(my_listener)
# Clear all listeners
workflow.clear_cot_listeners()
query_with_cot()¶
For a typed CoT response, use query_with_cot():
from askrita.models import ChainOfThoughtsOutput
cot_output = workflow.query_with_cot("What are total sales by region?")
# Reasoning breadcrumbs (up to 5 high-level steps)
for step in cot_output.reasoning.steps:
print(f" - {step}")
# SQL draft
print(f"SQL: {cot_output.sql.sql}")
print(f"Confidence: {cot_output.sql.confidence}")
# Execution result
if cot_output.result:
print(f"Rows: {cot_output.result.row_count}")
# Visualization recommendation
if cot_output.viz:
print(f"Chart: {cot_output.viz.kind}")
ChainOfThoughtsOutput¶
class ChainOfThoughtsOutput(BaseModel):
reasoning: ReasoningSummary # High-level step bullets
sql: SqlDraft # Generated SQL + confidence
result: Optional[ExecutionResult]
viz: Optional[VisualizationSpec]
Supporting models:
class ReasoningSummary(BaseModel):
steps: List[str] # e.g., ["Parsed question", "Generated SQL", ...]
class SqlDraft(BaseModel):
sql: str
confidence: float
class ExecutionResult(BaseModel):
rows: list
columns: list
row_count: int
class VisualizationSpec(BaseModel):
kind: str # Chart type
x: Optional[str]
y: Optional[str]
series: Optional[str]
options: Optional[dict]
When the query needs clarification, query_with_cot() returns a ClarificationQuestion instead:
Accessing CoT from WorkflowState¶
After a query, the full CoT trace is attached to the result:
result = workflow.query("What are total sales?")
cot = result.chain_of_thoughts
if cot:
summary = cot.get("summary", {})
print(f"Steps: {summary.get('total_steps')}")
print(f"Success: {summary.get('overall_success')}")
for step in cot.get("detailed_steps", []):
print(f" {step['step_name']}: {step['status']} ({step.get('duration_ms', 0):.0f}ms)")
Progress Callbacks¶
Progress callbacks provide a simpler alternative to CoT listeners for tracking step status.
Basic Usage¶
from askrita import SQLAgentWorkflow, ConfigManager
def on_progress(progress_data):
print(f"[{progress_data.status}] {progress_data.message}")
config = ConfigManager("config.yaml")
workflow = SQLAgentWorkflow(config, progress_callback=on_progress)
result = workflow.query("How many customers?")
Built-In Console Callback¶
Ask RITA includes a ready-made console callback:
from askrita.sqlagent.progress_tracker import create_simple_progress_callback
callback = create_simple_progress_callback()
workflow = SQLAgentWorkflow(config, progress_callback=callback)
result = workflow.query("Total revenue by month")
# Prints step-by-step progress to console
ProgressData¶
@dataclass
class ProgressData:
step_name: str # e.g., "generate_sql"
status: str # "started", "completed", "failed", "skipped"
message: str # User-facing message (with emoji)
error: Optional[str] # Error message if failed
step_index: Optional[int]
total_steps: Optional[int]
step_data: Optional[dict]
timestamp: str # ISO timestamp
def to_dict(self) -> dict:
"""Convert to JSON-serializable dict."""
Step Messages¶
Progress messages include emoji indicators for each step:
| Step | Message |
|---|---|
pii_detection |
"Scanning for sensitive data..." |
parse_question |
"Understanding your question..." |
get_unique_nouns |
"Looking up key values..." |
generate_sql |
"Writing SQL query..." |
validate_and_fix_sql |
"Validating SQL..." |
execute_sql |
"Running query..." |
format_results |
"Formatting results..." |
choose_and_format_visualization |
"Creating visualization..." |
generate_followup_questions |
"Generating follow-up questions..." |
Using Both CoT and Progress¶
CoT listeners and progress callbacks are independent — you can use both simultaneously:
def cot_listener(event):
# Detailed reasoning trace (for logging or streaming)
pass
def progress_callback(data):
# Simple status updates (for UI progress bar)
pass
workflow = SQLAgentWorkflow(
config,
progress_callback=progress_callback,
)
workflow.register_cot_listener(cot_listener)
result = workflow.query("Show me the dashboard data")
Configuration¶
Chain of Thoughts Configuration¶
chain_of_thoughts:
enabled: true # Master switch (default: true)
include_timing: true # Include step duration
include_confidence: true # Include confidence scores
include_step_details: false # Include detailed step metadata
track_retries: true # Track retry attempts
max_reasoning_length: 500 # Max chars for reasoning text (50-5000)
display_preferences: # Hints for UI rendering
show_successful_steps: true
show_failed_steps: true
show_skipped_steps: true
collapse_successful_steps: false
highlight_failed_steps: true
show_step_timing: true
show_confidence_scores: true
Configuration Options¶
| Option | Type | Default | Description |
|---|---|---|---|
enabled |
bool |
true |
Enable/disable CoT tracking |
include_timing |
bool |
true |
Record step start/end times and duration |
include_confidence |
bool |
true |
Include confidence scores per step |
include_step_details |
bool |
false |
Include detailed metadata (tokens, latency, etc.) |
track_retries |
bool |
true |
Track retry attempts in step details |
max_reasoning_length |
int |
500 |
Truncation limit for reasoning text (50–5000) |
Display Preferences¶
Display preferences are hints for UI rendering. They do not affect the data collected — they guide how a frontend should present the CoT trace:
| Preference | Default | Description |
|---|---|---|
show_successful_steps |
true |
Display steps that completed successfully |
show_failed_steps |
true |
Display steps that failed |
show_skipped_steps |
true |
Display steps that were skipped |
collapse_successful_steps |
false |
Collapse successful steps by default |
highlight_failed_steps |
true |
Visually highlight failed steps |
show_step_timing |
true |
Show duration for each step |
show_confidence_scores |
true |
Show confidence scores |
Usage Examples¶
Streaming to Server-Sent Events (SSE)¶
from flask import Flask, Response
import json
app = Flask(__name__)
@app.route("/query/stream")
def stream_query():
def generate():
events = []
def listener(event):
events.append(event)
workflow.register_cot_listener(listener)
result = workflow.query(request.args["q"])
workflow.unregister_cot_listener(listener)
for event in events:
yield f"data: {json.dumps(event)}\n\n"
yield f"data: {json.dumps({'type': 'result', 'answer': result.answer})}\n\n"
return Response(generate(), mimetype="text/event-stream")
Logging Step Performance¶
import logging
logger = logging.getLogger("askrita.performance")
def performance_listener(event):
if event["event_type"] == "cot_step_completed":
step = event["cot_step"]
logger.info(
f"Step {step['step_name']}: "
f"{step.get('duration_ms', 0):.0f}ms "
f"(confidence: {step.get('confidence_score', 'N/A')})"
)
elif event["event_type"] == "cot_workflow_completed":
metrics = event["summary"].get("performance_metrics", {})
logger.info(f"Total workflow time: {metrics.get('total_time_ms', 0):.0f}ms")
workflow.register_cot_listener(performance_listener)
Disabling CoT for Performance¶
When disabled, no tracker is created and no listener events are emitted. The chain_of_thoughts field in WorkflowState will be empty.
API Reference¶
SQLAgentWorkflow CoT Methods¶
class SQLAgentWorkflow:
def query_with_cot(self, question: str) -> ChainOfThoughtsOutput: ...
def register_cot_listener(self, listener: Callable[[Dict], None]) -> None: ...
def unregister_cot_listener(self, listener: Callable[[Dict], None]) -> None: ...
def clear_cot_listeners(self) -> None: ...
NoSQLAgentWorkflow CoT Methods¶
The NoSQL workflow supports the same CoT API:
class NoSQLAgentWorkflow:
def register_cot_listener(self, listener: Callable[[Dict], None]) -> None: ...
def unregister_cot_listener(self, listener: Callable[[Dict], None]) -> None: ...
def clear_cot_listeners(self) -> None: ...
Progress Tracking¶
from askrita.sqlagent.progress_tracker import (
ProgressData,
ProgressStatus,
create_simple_progress_callback,
)
# Type alias
ProgressCallback = Callable[[ProgressData], None]
How It Works¶
CoT Tracking Flow¶
flowchart TD
A["_execute_query(question)"] --> B["_init_cot_tracker(question)"]
B -->|"Create EnhancedChainOfThoughtsTracker; register internal listener"| C["_invoke_graph(state)"]
C -->|"Create ChainOfThoughtsCallbackHandler as LangGraph callbacks"| D["For each workflow step"]
D --> E["_track_step(step_name)"]
E -->|"tracker.start_step(); progress_callback(started)"| F["Step executes"]
F --> G["_complete_step(step_name)"]
G -->|"tracker.complete_current_step(); progress_callback(completed); cot_listener(cot_step_completed)"| H{More steps?}
H -->|Yes| D
H -->|No| I["_finalize_cot()"]
I -->|"tracker.finalize_workflow(); cot_listener(cot_workflow_completed); attach chain_of_thoughts to WorkflowState"| J[Done]
style A fill:#2F5496,color:#fff
style B fill:#4CAF50,color:#fff
style C fill:#4CAF50,color:#fff
style D fill:#00897B,color:#fff
style E fill:#4CAF50,color:#fff
style F fill:#4CAF50,color:#fff
style G fill:#4CAF50,color:#fff
style H fill:#FF9800,color:#fff
style I fill:#4CAF50,color:#fff
style J fill:#7B1FA2,color:#fff
Step Registry¶
Ask RITA maintains a global registry of workflow steps with metadata (step type, reasoning templates, descriptions). The registry is initialized at module import with all standard SQL workflow steps. Custom steps can be registered for plugins or extensions.
Troubleshooting¶
CoT Data Missing from WorkflowState¶
Symptom: result.chain_of_thoughts is empty.
- Check that
chain_of_thoughts.enabled: truein your config - CoT is enabled by default; if you explicitly set
enabled: false, it is disabled
Listeners Not Receiving Events¶
Symptom: Registered listener is never called.
- Ensure the listener is registered before calling
query()orchat() - Verify CoT is enabled in configuration
- Check that the listener function signature accepts a single
dictargument
Reasoning Text Truncated¶
Symptom: Reasoning strings end with ... or are cut off.
Increase max_reasoning_length:
Valid range: 50–5000 characters.
Performance Impact¶
CoT tracking adds minimal overhead (microseconds per step for timing and string operations). If you need maximum performance, disable it:
See also:
- Configuration Guide — Complete YAML configuration reference
- Conversational SQL — Chat mode and follow-up questions
- Usage Examples — Workflow usage patterns