ConnectOnionConnectOnion
NEW

Hook into agent lifecycle

React to events in your agent's execution flow. Add logging, monitoring, reflection, and custom behavior at every step.

12 Event Types

after_user_input
Fires once per turn
before_iteration
Before each iteration starts
before_llm
Before each LLM call
after_llm
After each LLM response
before_each_tool
Before each individual tool
before_tools
Once before all tools in batch
after_each_tool
After each tool (logging only)
after_tools
Once after all tools (safe for messages)
on_error
When tool execution fails
after_iteration
After iteration (checkpoints, can stop)
on_stop_signal
When interrupted (cleanup)
on_complete
After agent finishes task

Quick Start

Add event handlers to your agent in 3 simple steps:

main.py
1from connectonion import Agent, after_llm 2 3def log_llm_calls(agent): 4 """Track LLM performance""" 5 trace = agent.current_session['trace'][-1] 6 if trace['type'] == 'llm_call': 7 duration = trace['duration_ms'] 8 print(f"โšก LLM call: {duration:.0f}ms") 9 10agent = Agent( 11 "assistant", 12 tools=[search], 13 on_events=[after_llm(log_llm_calls)] 14) 15 16agent.input("Search for Python")
Python REPL
Interactive
โšก LLM call: 1204ms
โšก LLM call: 831ms
"I found results for Python..."

Tip: Event handlers receive the agent instance, giving you full access to current_session, messages, trace, and more.

Group multiple handlers

You can pass multiple handlers to the same event type:

main.py
1def check_shell(agent): 2 ... 3 4def check_email(agent): 5 ... 6 7agent = Agent( 8 "assistant", 9 on_events=[ 10 before_each_tool(check_shell, check_email), # group handlers for same event 11 ] 12)
Python REPL
Interactive
# Both handlers fire before each tool execution
# Cleaner than listing them separately

Note: Decorator Syntax
You can also use @before_each_tool decorator instead of before_each_tool(fn). We recommend wrapper style because it's easier for LLMs to understand when reading your code. But if you prefer decorators, they work too.

All Event Types

Here's when each event fires and what you can do with it:

after_user_input

Fires once per turn, after user input is added

main.py
1def add_timestamp(agent): 2 from datetime import datetime 3 timestamp = datetime.now().strftime("%H:%M:%S") 4 agent.current_session['messages'].append({ 5 'role': 'system', 6 'content': f'Current time: {timestamp}' 7 }) 8 9agent = Agent("assistant", on_events=[ 10 after_user_input(add_timestamp) 11])
Python REPL
Interactive
# LLM now sees timestamp in context
# Useful for: time-aware agents, logging, session metadata

before_iteration

Fires before each iteration starts (poll IO, check mode changes)

main.py
1def check_mode_change(agent): 2 """Poll for mode changes before iteration""" 3 # Check if IO has mode change request 4 if hasattr(agent.io, 'poll_mode_change'): 5 new_mode = agent.io.poll_mode_change() 6 if new_mode: 7 agent.current_session['mode'] = new_mode 8 print(f"๐Ÿ”„ Mode changed to: {new_mode}") 9 10agent = Agent("assistant", on_events=[ 11 before_iteration(check_mode_change) 12])
Python REPL
Interactive
# Fires before each iteration loop starts
# Useful for: polling IO, mode changes, iteration setup

after_llm

Fires after each LLM response (multiple times per turn)

main.py
1from connectonion import llm_do 2 3def add_reflection(agent): 4 """Add AI-generated reflection after tools execute""" 5 trace = agent.current_session['trace'] 6 7 # Find recent tool executions 8 recent_tools = [] 9 llm_count = 0 10 for entry in reversed(trace): 11 if entry.get('type') == 'llm_call': 12 llm_count += 1 13 if llm_count >= 2: 14 break 15 elif entry.get('type') == 'tool_execution': 16 recent_tools.append(entry) 17 18 if recent_tools: 19 result = recent_tools[0]['result'][:200] 20 reflection = llm_do( 21 f"Reflect on this result: {result}", 22 model="gpt-4o-mini" 23 ) 24 # Inject as assistant message (safe timing after tools) 25 agent.current_session['messages'].append({ 26 'role': 'assistant', 27 'content': f"๐Ÿ’ญ {reflection}" 28 }) 29 30agent = Agent("assistant", tools=[search], on_events=[ 31 after_llm(add_reflection) 32])
Python REPL
Interactive
๐Ÿ’ญ The search results provide comprehensive information about AI...
# Useful for: reflection, chain-of-thought, meta-cognition

before_each_tool

Fires before EACH individual tool execution

main.py
1def validate_tool(agent): 2 """Validate tool before execution""" 3 pending = agent.current_session['pending_tool'] 4 tool_name = pending['name'] 5 print(f"๐Ÿ”ง About to run: {tool_name}") 6 # Raise exception to cancel execution 7 8agent = Agent("assistant", tools=[search], on_events=[ 9 before_each_tool(validate_tool) 10])
Python REPL
Interactive
๐Ÿ”ง About to run: search
# Useful for: validation, approval prompts, logging

before_tools

Fires ONCE before ALL tools in a batch

main.py
1def log_batch_start(agent): 2 """Log start of tool execution batch""" 3 print("๐Ÿ”„ Starting tool execution batch...") 4 5agent = Agent("assistant", tools=[search, analyze], on_events=[ 6 before_tools(log_batch_start) 7])
Python REPL
Interactive
๐Ÿ”„ Starting tool execution batch...
# Useful for: batch validation, batch-level logging

after_each_tool

Fires after EACH tool (logging only, NOT for messages)

WARNING: Do NOT add messages here! This breaks Anthropic Claude's API message ordering.

main.py
1def log_tool_timing(agent): 2 """Log each tool's execution time""" 3 trace = agent.current_session['trace'][-1] 4 if trace['type'] == 'tool_execution': 5 timing = trace['timing'] 6 print(f"๐Ÿ”ง {trace['tool_name']}: {timing:.0f}ms") 7 8agent = Agent("assistant", tools=[search, analyze], on_events=[ 9 after_each_tool(log_tool_timing) 10])
Python REPL
Interactive
๐Ÿ”ง search: 245ms
๐Ÿ”ง analyze: 1842ms
# Useful for: timing, performance logging

after_tools

Fires ONCE after ALL tools complete (safe for messages)

SAFE: This is the correct place to add reflection messages after tools.

main.py
1def add_reflection(agent): 2 """Add reflection after all tools complete""" 3 trace = agent.current_session['trace'] 4 recent_tools = [t for t in trace if t['type'] == 'tool_execution'][-3:] 5 if recent_tools: 6 agent.current_session['messages'].append({ 7 'role': 'assistant', 8 'content': f"Completed {len(recent_tools)} tools" 9 }) 10 11agent = Agent("assistant", tools=[search, analyze], on_events=[ 12 after_tools(add_reflection) 13])
Python REPL
Interactive
Completed 2 tools
# Useful for: reflection, summarization, message injection

on_error

Fires when tool execution fails or tool not found

main.py
1def handle_errors(agent): 2 """Custom error handling""" 3 trace = agent.current_session['trace'][-1] 4 if trace.get('status') in ('error', 'not_found'): 5 error = trace.get('error', 'Unknown error') 6 print(f"โŒ Error: {error}") 7 8 # Log to monitoring service 9 # Add recovery instructions to messages 10 # Implement retry logic 11 12agent = Agent("assistant", tools=[api_call], on_events=[ 13 on_error(handle_errors) 14])
Python REPL
Interactive
โŒ Error: API rate limit exceeded
# Useful for: error logging, retry logic, fallback behavior

after_iteration

Fires after each iteration (checkpoints, can control loop continuation)

main.py
1def save_checkpoint(agent): 2 """Save checkpoint after each iteration""" 3 iteration = agent.current_session['iteration'] 4 trace = agent.current_session['trace'] 5 6 # Save state to file 7 checkpoint = { 8 'iteration': iteration, 9 'messages': agent.current_session['messages'], 10 'trace': trace 11 } 12 save_to_file(f'.co/checkpoint_{iteration}.json', checkpoint) 13 print(f"๐Ÿ’พ Checkpoint saved: iteration {iteration}") 14 15agent = Agent("assistant", tools=[search], on_events=[ 16 after_iteration(save_checkpoint) 17])
Python REPL
Interactive
๐Ÿ’พ Checkpoint saved: iteration 1
๐Ÿ’พ Checkpoint saved: iteration 2
# Useful for: checkpoints, stopping loop, tracking iterations

on_stop_signal

Fires when stop_signal is set (cleanup interrupted operations)

Note: Mutually exclusive with on_complete - either this fires (interrupted) OR on_complete fires (normal completion), never both.

main.py
1def cleanup_interrupted_work(agent): 2 """Cleanup when user interrupts operation""" 3 trace = agent.current_session['trace'] 4 5 # Rollback files written this turn 6 files_modified = [ 7 t['args']['file_path'] 8 for t in trace 9 if t.get('name') == 'write' and t.get('status') == 'success' 10 ] 11 12 for file_path in files_modified: 13 restore_from_backup(file_path) 14 print(f"โช Rolled back: {file_path}") 15 16 print("โœ… Cleanup complete - ready for new input") 17 18agent = Agent("assistant", tools=[write, read], on_events=[ 19 on_stop_signal(cleanup_interrupted_work) 20])
Python REPL
Interactive
โช Rolled back: config.json
โœ… Cleanup complete - ready for new input
# Useful for: rollback, cleanup, save checkpoints, notify user

on_complete

Fires once after agent finishes task

main.py
1def log_completion(agent): 2 """Log task completion with stats""" 3 trace = agent.current_session['trace'] 4 5 llm_calls = sum(1 for t in trace if t['type'] == 'llm_call') 6 tool_calls = sum(1 for t in trace if t['type'] == 'tool_execution') 7 errors = sum(1 for t in trace if t.get('status') == 'error') 8 9 print(f"โœ… Task complete: {llm_calls} LLM calls, {tool_calls} tools, {errors} errors") 10 11agent = Agent("assistant", tools=[search], on_events=[ 12 on_complete(log_completion) 13])
Python REPL
Interactive
โœ… Task complete: 2 LLM calls, 1 tools, 0 errors
# Useful for: metrics, cleanup, notifications, logging

Combining Multiple Events

Use multiple event handlers together for comprehensive monitoring and control:

main.py
1from connectonion import Agent, after_user_input, before_iteration, after_llm, after_each_tool, on_error, after_iteration, on_stop_signal, on_complete 2from datetime import datetime 3 4def log_session_start(agent): 5 print(f"๐Ÿ“ Session started at {datetime.now()}") 6 7def check_iteration(agent): 8 iteration = agent.current_session.get('iteration', 0) 9 print(f"๐Ÿ”„ Starting iteration {iteration}") 10 11def track_llm(agent): 12 trace = agent.current_session['trace'][-1] 13 if trace['type'] == 'llm_call': 14 print(f"โšก LLM: {trace['duration_ms']:.0f}ms") 15 16def track_tools(agent): 17 trace = agent.current_session['trace'][-1] 18 if trace['type'] == 'tool_execution': 19 print(f"๐Ÿ”ง Tool: {trace['tool_name']}") 20 21def handle_errors(agent): 22 trace = agent.current_session['trace'][-1] 23 print(f"โŒ Error: {trace.get('error')}") 24 25def save_checkpoint(agent): 26 iteration = agent.current_session.get('iteration', 0) 27 print(f"๐Ÿ’พ Checkpoint: iteration {iteration}") 28 29def handle_interruption(agent): 30 print(f"โš ๏ธ Operation interrupted - cleaning up") 31 32def log_completion(agent): 33 print(f"โœ… Task complete") 34 35agent = Agent( 36 "full_monitoring", 37 tools=[search, analyze], 38 on_events=[ 39 after_user_input(log_session_start), 40 before_iteration(check_iteration), 41 after_llm(track_llm), 42 after_each_tool(track_tools), 43 on_error(handle_errors), 44 after_iteration(save_checkpoint), 45 on_stop_signal(handle_interruption), 46 on_complete(log_completion) 47 ] 48) 49 50agent.input("Search and analyze Python")
Python REPL
Interactive
๐Ÿ“ Session started at 2025-01-04 15:30:42
โšก LLM: 1204ms
๐Ÿ”ง Tool: search
โšก LLM: 831ms
๐Ÿ”ง Tool: analyze
โšก LLM: 1142ms
โœ… Task complete
"Analysis complete..."

Key Concepts

Event Handler Signature

All event handlers receive the agent instance:

main.py
1def my_event_handler(agent: Agent) -> None: 2 # Access agent state 3 messages = agent.current_session['messages'] 4 trace = agent.current_session['trace'] 5 user_prompt = agent.current_session['user_prompt'] 6 iteration = agent.current_session['iteration'] 7 8 # Modify agent state 9 messages.append({'role': 'system', 'content': 'Context'}) 10 11 # Access agent attributes 12 tool_names = agent.list_tools() 13 model = agent.llm.model
Python REPL
Interactive
# Event handlers are regular Python functions
# Full access to agent internals
# Can read AND modify agent state

Message Injection Timing

Important: Use after_tools to inject messages after tool execution:

โŒ Don't use after_each_tool: Injecting messages during tool execution breaks Anthropic Claude's message sequence (all tool_results must follow tool_use)

โœ… Use after_tools: Fires once after ALL tool results are added to messages, safe for reflection injection

Error Handling

Event handlers follow fail-fast principle:

main.py
1def failing_event(agent): 2 raise RuntimeError("Event failed") 3 4agent = Agent("test", on_events=[ 5 after_llm(failing_event) 6]) 7 8agent.input("test") # Raises RuntimeError
Python REPL
Interactive
RuntimeError: Event failed
# Exceptions propagate - agents stop on event errors
# Design events to be robust or handle exceptions internally

Real-World Use Cases

1. Performance Monitoring Dashboard

main.py
1class PerformanceMonitor: 2 def __init__(self): 3 self.metrics = { 4 'llm_calls': 0, 5 'tool_calls': 0, 6 'total_llm_time': 0, 7 'total_tool_time': 0, 8 'errors': 0 9 } 10 11 def track_llm(self, agent): 12 trace = agent.current_session['trace'][-1] 13 if trace['type'] == 'llm_call': 14 self.metrics['llm_calls'] += 1 15 self.metrics['total_llm_time'] += trace['duration_ms'] 16 17 def track_tool(self, agent): 18 trace = agent.current_session['trace'][-1] 19 if trace['type'] == 'tool_execution': 20 self.metrics['tool_calls'] += 1 21 self.metrics['total_tool_time'] += trace['timing'] 22 23 def track_error(self, agent): 24 self.metrics['errors'] += 1 25 26 def report(self): 27 print(f"LLM calls: {self.metrics['llm_calls']}") 28 print(f"Avg LLM time: {self.metrics['total_llm_time'] / max(1, self.metrics['llm_calls']):.0f}ms") 29 print(f"Tool calls: {self.metrics['tool_calls']}") 30 print(f"Errors: {self.metrics['errors']}") 31 32monitor = PerformanceMonitor() 33agent = Agent("monitored", tools=[search], on_events=[ 34 after_llm(monitor.track_llm), 35 after_each_tool(monitor.track_tool), 36 on_error(monitor.track_error) 37]) 38 39agent.input("Complex task...") 40monitor.report()
Python REPL
Interactive
LLM calls: 3
Avg LLM time: 1245ms
Tool calls: 2
Errors: 0

2. Automatic Context Injection

main.py
1def inject_company_context(agent): 2 """Add company-specific context to every query""" 3 agent.current_session['messages'].append({ 4 'role': 'system', 5 'content': '''You are a customer support agent for Acme Corp. 6 - Be friendly and professional 7 - Reference our 30-day return policy 8 - Escalate billing issues to finance team''' 9 }) 10 11agent = Agent( 12 "support_agent", 13 tools=[search_knowledge_base, create_ticket], 14 on_events=[after_user_input(inject_company_context)] 15)
Python REPL
Interactive
# Every user query now includes company context
# LLM follows company policies automatically
# No need to repeat instructions in every prompt

3. Smart Retry Logic

main.py
1class RetryHandler: 2 def __init__(self, max_retries=3): 3 self.max_retries = max_retries 4 self.retry_count = {} 5 6 def handle_error(self, agent): 7 trace = agent.current_session['trace'][-1] 8 tool_name = trace.get('tool_name') 9 10 # Track retries 11 if tool_name not in self.retry_count: 12 self.retry_count[tool_name] = 0 13 14 self.retry_count[tool_name] += 1 15 16 if self.retry_count[tool_name] < self.max_retries: 17 # Add retry instruction to messages 18 agent.current_session['messages'].append({ 19 'role': 'system', 20 'content': f'Previous {tool_name} failed. Try with different parameters.' 21 }) 22 print(f"๐Ÿ”„ Retry {self.retry_count[tool_name]}/{self.max_retries}") 23 else: 24 print(f"โŒ Max retries reached for {tool_name}") 25 26retry_handler = RetryHandler() 27agent = Agent("resilient", tools=[flaky_api], on_events=[ 28 on_error(retry_handler.handle_error) 29])
Python REPL
Interactive
๐Ÿ”„ Retry 1/3
๐Ÿ”„ Retry 2/3
โœ“ Success on retry 2

API Reference

Event Wrapper Functions

after_user_input(func: Callable[[Agent], None]) โ†’ EventHandler

Fires once per turn after user input is added to session.

before_iteration(func: Callable[[Agent], None]) โ†’ EventHandler

Fires before each iteration starts (poll IO, check mode changes).

before_llm(func: Callable[[Agent], None]) โ†’ EventHandler

Fires before each LLM call.

after_llm(func: Callable[[Agent], None]) โ†’ EventHandler

Fires after each LLM response.

before_each_tool(func: Callable[[Agent], None]) โ†’ EventHandler

Fires before EACH individual tool execution. Access pending tool via agent.current_session['pending_tool'].

before_tools(func: Callable[[Agent], None]) โ†’ EventHandler

Fires ONCE before ALL tools in a batch execute.

after_each_tool(func: Callable[[Agent], None]) โ†’ EventHandler

Fires after EACH individual tool. WARNING: Do NOT add messages here!

after_tools(func: Callable[[Agent], None]) โ†’ EventHandler

Fires ONCE after ALL tools complete. SAFE for adding messages.

on_error(func: Callable[[Agent], None]) โ†’ EventHandler

Fires when tool execution fails or tool is not found.

after_iteration(func: Callable[[Agent], None]) โ†’ EventHandler

Fires after each iteration (checkpoints, can control loop continuation).

on_stop_signal(func: Callable[[Agent], None]) โ†’ EventHandler

Fires when stop_signal is set (cleanup interrupted operations). Mutually exclusive with on_complete.

on_complete(func: Callable[[Agent], None]) โ†’ EventHandler

Fires once after agent completes the task.

Agent Constructor

Agent(name, tools, on_events: Optional[List[EventHandler]] = None, ...)

on_events: List of event handlers wrapped with event type functions

Best Practices

โœ… Keep handlers simple: Each event handler should do one thing well. Compose multiple handlers for complex behavior.

โœ… Use after_tools for message injection: This is the safe time to inject reflection/context after ALL tools in a batch complete.

โœ… Handle exceptions internally: If your event handler can fail, catch exceptions to prevent stopping the agent.

โŒ Don't inject during tool execution: Using after_each_tool to inject messages breaks Anthropic Claude's tool_result message ordering.

โŒ Don't do heavy computation: Event handlers run synchronously and block agent execution. Keep them fast.

Next Steps

Enjoying ConnectOnion?

โญ Star us on GitHub = โ˜• Coffee chat with our founder. We love meeting builders.