Agent APIEvent System (on_events)
DocsEvents (on_events)

Events (on_events)

New in v0.9

React to events in your agent's execution flow — logging, monitoring, reflection, custom behavior at every step.

12 Event Types
after_user_inputFires once per turn, after user input is added
before_iterationBefore each iteration starts — poll IO, check mode changes
before_llmBefore each LLM call — inject context, modify messages
after_llmAfter each LLM response — track latency, log outputs
before_each_toolBefore each individual tool execution
before_toolsOnce before all tools in a batch fire
after_each_toolAfter each individual tool — logging only, don't add messages
after_toolsOnce after all tools complete — safe place to add messages
on_errorWhen tool execution fails or tool is not found
after_iterationAfter each iteration — checkpoints, can stop the loop
on_stop_signalWhen stop_signal is set — cleanup interrupted operations
on_completeFires once after agent finishes the task

Quick Start

Add event handlers to your agent in 3 simple steps:

main.py
from connectonion import Agent, after_llm def log_llm_calls(agent): """Track LLM performance""" trace = agent.current_session['trace'][-1] if trace['type'] == 'llm_call': duration = trace['duration_ms'] print(f"⚡ LLM call: {duration:.0f}ms") agent = Agent( "assistant", tools=[search], on_events=[after_llm(log_llm_calls)] ) agent.input("Search for Python")
output
⚡ LLM call: 1204ms
⚡ LLM call: 831ms
"I found results for Python..."

Tip: Event handlers receive the agent instance, giving you full access to current_session, messages, trace, and more.

Group multiple handlers

You can pass multiple handlers to the same event type:

main.py
def check_shell(agent): ... def check_email(agent): ... agent = Agent( "assistant", on_events=[ before_each_tool(check_shell, check_email), # group handlers for same event ] )
output
# Both handlers fire before each tool execution
# Cleaner than listing them separately

Note: Decorator Syntax
You can also use @before_each_tool decorator instead of before_each_tool(fn). We recommend wrapper style because it's easier for LLMs to understand when reading your code. But if you prefer decorators, they work too.

All Event Types

Here's when each event fires and what you can do with it:

§ I · User Input
1 event

after_user_input

Fires once per turn, after user input is added

main.py
def add_timestamp(agent): from datetime import datetime timestamp = datetime.now().strftime("%H:%M:%S") agent.current_session['messages'].append({ 'role': 'system', 'content': f'Current time: {timestamp}' }) agent = Agent("assistant", on_events=[ after_user_input(add_timestamp) ])
output
# LLM now sees timestamp in context
# Useful for: time-aware agents, logging, session metadata
§ II · Per-Iteration
4 events

before_iteration

Fires before each iteration starts (poll IO, check mode changes)

main.py
def check_mode_change(agent): """Poll for mode changes before iteration""" # Check if IO has mode change request if hasattr(agent.io, 'poll_mode_change'): new_mode = agent.io.poll_mode_change() if new_mode: agent.current_session['mode'] = new_mode print(f"🔄 Mode changed to: {new_mode}") agent = Agent("assistant", on_events=[ before_iteration(check_mode_change) ])
output
# Fires before each iteration loop starts
# Useful for: polling IO, mode changes, iteration setup

after_llm

Fires after each LLM response (multiple times per turn)

main.py
from connectonion import llm_do def add_reflection(agent): """Add AI-generated reflection after tools execute""" trace = agent.current_session['trace'] # Find recent tool executions recent_tools = [] llm_count = 0 for entry in reversed(trace): if entry.get('type') == 'llm_call': llm_count += 1 if llm_count >= 2: break elif entry.get('type') == 'tool_execution': recent_tools.append(entry) if recent_tools: result = recent_tools[0]['result'][:200] reflection = llm_do( f"Reflect on this result: {result}", model="gpt-4o-mini" ) # Inject as assistant message (safe timing after tools) agent.current_session['messages'].append({ 'role': 'assistant', 'content': f"💭 {reflection}" }) agent = Agent("assistant", tools=[search], on_events=[ after_llm(add_reflection) ])
output
💭 The search results provide comprehensive information about AI...
# Useful for: reflection, chain-of-thought, meta-cognition
§ III · Tool Execution
5 events

before_each_tool

Fires before EACH individual tool execution

main.py
def validate_tool(agent): """Validate tool before execution""" pending = agent.current_session['pending_tool'] tool_name = pending['name'] print(f"🔧 About to run: {tool_name}") # Raise exception to cancel execution agent = Agent("assistant", tools=[search], on_events=[ before_each_tool(validate_tool) ])
output
🔧 About to run: search
# Useful for: validation, approval prompts, logging

before_tools

Fires ONCE before ALL tools in a batch

main.py
def log_batch_start(agent): """Log start of tool execution batch""" print("🔄 Starting tool execution batch...") agent = Agent("assistant", tools=[search, analyze], on_events=[ before_tools(log_batch_start) ])
output
🔄 Starting tool execution batch...
# Useful for: batch validation, batch-level logging

after_each_tool

Fires after EACH tool (logging only, NOT for messages)

WARNING: Do NOT add messages here! This breaks Anthropic Claude's API message ordering.

main.py
def log_tool_timing(agent): """Log each tool's execution time""" trace = agent.current_session['trace'][-1] if trace['type'] == 'tool_execution': timing = trace['timing'] print(f"🔧 {trace['tool_name']}: {timing:.0f}ms") agent = Agent("assistant", tools=[search, analyze], on_events=[ after_each_tool(log_tool_timing) ])
output
🔧 search: 245ms
🔧 analyze: 1842ms
# Useful for: timing, performance logging

after_tools

Fires ONCE after ALL tools complete (safe for messages)

SAFE: This is the correct place to add reflection messages after tools.

main.py
def add_reflection(agent): """Add reflection after all tools complete""" trace = agent.current_session['trace'] recent_tools = [t for t in trace if t['type'] == 'tool_execution'][-3:] if recent_tools: agent.current_session['messages'].append({ 'role': 'assistant', 'content': f"Completed {len(recent_tools)} tools" }) agent = Agent("assistant", tools=[search, analyze], on_events=[ after_tools(add_reflection) ])
output
Completed 2 tools
# Useful for: reflection, summarization, message injection

on_error

Fires when tool execution fails or tool not found

main.py
def handle_errors(agent): """Custom error handling""" trace = agent.current_session['trace'][-1] if trace.get('status') in ('error', 'not_found'): error = trace.get('error', 'Unknown error') print(f"❌ Error: {error}") # Log to monitoring service # Add recovery instructions to messages # Implement retry logic agent = Agent("assistant", tools=[api_call], on_events=[ on_error(handle_errors) ])
output
❌ Error: API rate limit exceeded
# Useful for: error logging, retry logic, fallback behavior
§ IV · Lifecycle
3 events

after_iteration

Fires after each iteration (checkpoints, can control loop continuation)

main.py
def save_checkpoint(agent): """Save checkpoint after each iteration""" iteration = agent.current_session['iteration'] trace = agent.current_session['trace'] # Save state to file checkpoint = { 'iteration': iteration, 'messages': agent.current_session['messages'], 'trace': trace } save_to_file(f'.co/checkpoint_{iteration}.json', checkpoint) print(f"💾 Checkpoint saved: iteration {iteration}") agent = Agent("assistant", tools=[search], on_events=[ after_iteration(save_checkpoint) ])
output
💾 Checkpoint saved: iteration 1
💾 Checkpoint saved: iteration 2
# Useful for: checkpoints, stopping loop, tracking iterations

on_stop_signal

Fires when stop_signal is set (cleanup interrupted operations)

Note: Mutually exclusive with on_complete - either this fires (interrupted) OR on_complete fires (normal completion), never both.

main.py
def cleanup_interrupted_work(agent): """Cleanup when user interrupts operation""" trace = agent.current_session['trace'] # Rollback files written this turn files_modified = [ t['args']['file_path'] for t in trace if t.get('name') == 'write' and t.get('status') == 'success' ] for file_path in files_modified: restore_from_backup(file_path) print(f"⏪ Rolled back: {file_path}") print("✅ Cleanup complete - ready for new input") agent = Agent("assistant", tools=[write, read], on_events=[ on_stop_signal(cleanup_interrupted_work) ])
output
⏪ Rolled back: config.json
✅ Cleanup complete - ready for new input
# Useful for: rollback, cleanup, save checkpoints, notify user

on_complete

Fires once after agent finishes task

main.py
def log_completion(agent): """Log task completion with stats""" trace = agent.current_session['trace'] llm_calls = sum(1 for t in trace if t['type'] == 'llm_call') tool_calls = sum(1 for t in trace if t['type'] == 'tool_execution') errors = sum(1 for t in trace if t.get('status') == 'error') print(f"✅ Task complete: {llm_calls} LLM calls, {tool_calls} tools, {errors} errors") agent = Agent("assistant", tools=[search], on_events=[ on_complete(log_completion) ])
output
✅ Task complete: 2 LLM calls, 1 tools, 0 errors
# Useful for: metrics, cleanup, notifications, logging

Combining Multiple Events

Use multiple event handlers together for comprehensive monitoring and control:

main.py
from connectonion import Agent, after_user_input, before_iteration, after_llm, after_each_tool, on_error, after_iteration, on_stop_signal, on_complete from datetime import datetime def log_session_start(agent): print(f"📝 Session started at {datetime.now()}") def check_iteration(agent): iteration = agent.current_session.get('iteration', 0) print(f"🔄 Starting iteration {iteration}") def track_llm(agent): trace = agent.current_session['trace'][-1] if trace['type'] == 'llm_call': print(f"⚡ LLM: {trace['duration_ms']:.0f}ms") def track_tools(agent): trace = agent.current_session['trace'][-1] if trace['type'] == 'tool_execution': print(f"🔧 Tool: {trace['tool_name']}") def handle_errors(agent): trace = agent.current_session['trace'][-1] print(f"❌ Error: {trace.get('error')}") def save_checkpoint(agent): iteration = agent.current_session.get('iteration', 0) print(f"💾 Checkpoint: iteration {iteration}") def handle_interruption(agent): print(f"⚠️ Operation interrupted - cleaning up") def log_completion(agent): print(f"✅ Task complete") agent = Agent( "full_monitoring", tools=[search, analyze], on_events=[ after_user_input(log_session_start), before_iteration(check_iteration), after_llm(track_llm), after_each_tool(track_tools), on_error(handle_errors), after_iteration(save_checkpoint), on_stop_signal(handle_interruption), on_complete(log_completion) ] ) agent.input("Search and analyze Python")
output
📝 Session started at 2025-01-04 15:30:42
⚡ LLM: 1204ms
🔧 Tool: search
⚡ LLM: 831ms
🔧 Tool: analyze
⚡ LLM: 1142ms
✅ Task complete
"Analysis complete..."

Key Concepts

Event Handler Signature

All event handlers receive the agent instance:

main.py
def my_event_handler(agent: Agent) -> None: # Access agent state messages = agent.current_session['messages'] trace = agent.current_session['trace'] user_prompt = agent.current_session['user_prompt'] iteration = agent.current_session['iteration'] # Modify agent state messages.append({'role': 'system', 'content': 'Context'}) # Access agent attributes tool_names = agent.list_tools() model = agent.llm.model
output
# Event handlers are regular Python functions
# Full access to agent internals
# Can read AND modify agent state

Message Injection Timing

Important: Use after_tools to inject messages after tool execution:

Don't use after_each_tool: Injecting messages during tool execution breaks Anthropic Claude's message sequence (all tool_results must follow tool_use)

Use after_tools: Fires once after ALL tool results are added to messages, safe for reflection injection

Error Handling

Event handlers follow fail-fast principle:

main.py
def failing_event(agent): raise RuntimeError("Event failed") agent = Agent("test", on_events=[ after_llm(failing_event) ]) agent.input("test") # Raises RuntimeError
output
RuntimeError: Event failed
# Exceptions propagate - agents stop on event errors
# Design events to be robust or handle exceptions internally

Real-World Use Cases

1. Performance Monitoring Dashboard

main.py
class PerformanceMonitor: def __init__(self): self.metrics = { 'llm_calls': 0, 'tool_calls': 0, 'total_llm_time': 0, 'total_tool_time': 0, 'errors': 0 } def track_llm(self, agent): trace = agent.current_session['trace'][-1] if trace['type'] == 'llm_call': self.metrics['llm_calls'] += 1 self.metrics['total_llm_time'] += trace['duration_ms'] def track_tool(self, agent): trace = agent.current_session['trace'][-1] if trace['type'] == 'tool_execution': self.metrics['tool_calls'] += 1 self.metrics['total_tool_time'] += trace['timing'] def track_error(self, agent): self.metrics['errors'] += 1 def report(self): print(f"LLM calls: {self.metrics['llm_calls']}") print(f"Avg LLM time: {self.metrics['total_llm_time'] / max(1, self.metrics['llm_calls']):.0f}ms") print(f"Tool calls: {self.metrics['tool_calls']}") print(f"Errors: {self.metrics['errors']}") monitor = PerformanceMonitor() agent = Agent("monitored", tools=[search], on_events=[ after_llm(monitor.track_llm), after_each_tool(monitor.track_tool), on_error(monitor.track_error) ]) agent.input("Complex task...") monitor.report()
output
LLM calls: 3
Avg LLM time: 1245ms
Tool calls: 2
Errors: 0

2. Automatic Context Injection

main.py
def inject_company_context(agent): """Add company-specific context to every query""" agent.current_session['messages'].append({ 'role': 'system', 'content': '''You are a customer support agent for Acme Corp. - Be friendly and professional - Reference our 30-day return policy - Escalate billing issues to finance team''' }) agent = Agent( "support_agent", tools=[search_knowledge_base, create_ticket], on_events=[after_user_input(inject_company_context)] )
output
# Every user query now includes company context
# LLM follows company policies automatically
# No need to repeat instructions in every prompt

3. Smart Retry Logic

main.py
class RetryHandler: def __init__(self, max_retries=3): self.max_retries = max_retries self.retry_count = {} def handle_error(self, agent): trace = agent.current_session['trace'][-1] tool_name = trace.get('tool_name') # Track retries if tool_name not in self.retry_count: self.retry_count[tool_name] = 0 self.retry_count[tool_name] += 1 if self.retry_count[tool_name] < self.max_retries: # Add retry instruction to messages agent.current_session['messages'].append({ 'role': 'system', 'content': f'Previous {tool_name} failed. Try with different parameters.' }) print(f"🔄 Retry {self.retry_count[tool_name]}/{self.max_retries}") else: print(f"❌ Max retries reached for {tool_name}") retry_handler = RetryHandler() agent = Agent("resilient", tools=[flaky_api], on_events=[ on_error(retry_handler.handle_error) ])
output
🔄 Retry 1/3
🔄 Retry 2/3
✓ Success on retry 2

API Reference

Event Wrapper Functions

after_user_input(func: Callable[[Agent], None]) → EventHandler

Fires once per turn after user input is added to session.

before_iteration(func: Callable[[Agent], None]) → EventHandler

Fires before each iteration starts (poll IO, check mode changes).

before_llm(func: Callable[[Agent], None]) → EventHandler

Fires before each LLM call.

after_llm(func: Callable[[Agent], None]) → EventHandler

Fires after each LLM response.

before_each_tool(func: Callable[[Agent], None]) → EventHandler

Fires before EACH individual tool execution. Access pending tool via agent.current_session['pending_tool'].

before_tools(func: Callable[[Agent], None]) → EventHandler

Fires ONCE before ALL tools in a batch execute.

after_each_tool(func: Callable[[Agent], None]) → EventHandler

Fires after EACH individual tool. WARNING: Do NOT add messages here!

after_tools(func: Callable[[Agent], None]) → EventHandler

Fires ONCE after ALL tools complete. SAFE for adding messages.

on_error(func: Callable[[Agent], None]) → EventHandler

Fires when tool execution fails or tool is not found.

after_iteration(func: Callable[[Agent], None]) → EventHandler

Fires after each iteration (checkpoints, can control loop continuation).

on_stop_signal(func: Callable[[Agent], None]) → EventHandler

Fires when stop_signal is set (cleanup interrupted operations). Mutually exclusive with on_complete.

on_complete(func: Callable[[Agent], None]) → EventHandler

Fires once after agent completes the task.

Agent Constructor

Agent(name, tools, on_events: Optional[List[EventHandler]] = None, ...)

on_events: List of event handlers wrapped with event type functions

Best Practices

Keep handlers simple: Each event handler should do one thing well. Compose multiple handlers for complex behavior.

Use after_tools for message injection: This is the safe time to inject reflection/context after ALL tools in a batch complete.

Handle exceptions internally: If your event handler can fail, catch exceptions to prevent stopping the agent.

Don't inject during tool execution: Using after_each_tool to inject messages breaks Anthropic Claude's tool_result message ordering.

Don't do heavy computation: Event handlers run synchronously and block agent execution. Keep them fast.

Next Steps

Star us on GitHub

If ConnectOnion saves you time, a ⭐ goes a long way — and earns you a coffee chat with our founder.