Multi-Agent Collaboration with Human-in-loop
Multi agents using LangGraph
Challenges with single agents
Single agent systems are not efficient for diverse tasks or for applications which may require multiple tools. Imagine input context size if have to use 100s of tools. Each tool has its own description and input/output schema. In such cases, it is difficult to use a single model that can handle all the tools.
Some of the common challenges are: - Infelxibility: our agentic application is limited to one LLM - Contextual overload - too much information in the context - Lack of parallel processing - Single point of failure
What gets covered in this lab:
we wil cover these aspects below: - Multi Agent
collaboration - Leverage memory for 'turn-by-turn' conversations - Leverage Tools like API's
and RAG
for searching for answers. - use Human In the loop for some critical workflows
Multi agents
In multi agent systems, each agent can have its own prompt, LLM and tools.
Benefits of multi agent systems: - Agent can be more efficient as it has its on focused tasks - Logical grouping of tools can give better results - Easy to manage prompts for individual agents - Each agent can be tested and evaluated separately
In this example we are going to use supervisor agentic pattern. In this pattern multiple agents are connected via supervisor agent and the conversation flows via the supervisor
agents but each agent has its own scratchpad.
The supervisor agent acts as a central coordinator in a multi-agent system, orchestrating the interactions between various specialized agents. It delegates tasks to the appropriate agents based on their capabilities and the requirements of the task at hand. This approach allows for more efficient processing as each agent can focus on its specific tasks, reducing the complexity and context overload that a single agent might face. The supervisor agent ensures that the system is flexible, scalable, and can handle diverse tasks by leveraging the strengths of individual agents. It also facilitates parallel processing and minimizes the risk of a single point of failure by distributing tasks across multiple agents.
Scenario
The below image shows the tools and the flow of data and Control as we progress with our Travel Assistant bot
Flight agent
The flight agent is designed to handle various tasks related to flight booking management. It utilizes a set of tools to perform operations such as searching for available flights, retrieving detailed booking information, modifying existing flight bookings, and canceling reservations. By leveraging these tools, the flight agent can efficiently manage flight-related queries and actions.
The flight agent is equipped with a variety of tools.
These tools include: - Flight Search Tool: Allows users to search for available flights between specified cities on a given date. It provides detailed information about airlines, departure and arrival times, flight duration, and pricing.
-
Booking Retrieval Tool: Enables the retrieval of detailed booking information using a booking ID. This tool is essential for users who need to review or confirm their flight details.
-
Booking Modification Tool: Offers the capability to modify existing flight bookings. Users can change flight dates, times, or even cancel reservations if needed.
-
Cancellation Tool: Facilitates the cancellation of flight bookings, ensuring that users can manage their travel plans with ease.
Flight Search tool
The search_flights
function is a tool designed to search for flights between two specified cities.
Purpose: This tool simulates a flight search engine, providing mock flight data for given departure and arrival cities on a specified date.
Note: This function is designed for demonstration and testing purposes, using randomly generated data rather than real flight information.
The tool provides a simulated flight search experience, allowing for the testing and development of flight booking systems or travel planning applications without accessing real flight data APIs.
Setup
# %pip install -U --no-cache-dir \
# "langchain==0.3.7" \
# "langchain-aws==0.2.6" \
# "langchain-community==0.3.5" \
# "langchain-text-splitters==0.3.2" \
# "langchainhub==0.1.20" \
# "langgraph==0.2.45" \
# "langgraph-checkpoint==2.0.2" \
# "langgraph-sdk==0.1.35" \
# "langsmith==0.1.140" \
# "pypdf==3.8,<4" \
# "ipywidgets>=7,<8" \
# "matplotlib==3.9.0"
import sqlite3
from contextlib import closing
from langchain_core.tools import tool
import random
from datetime import datetime, timedelta
from langgraph.prebuilt import ToolNode
import sqlite3
import pandas as pd
from langchain_core.runnables.config import RunnableConfig
def read_travel_data(file_path: str = "data/synthetic_travel_data.csv") -> pd.DataFrame:
"""Read travel data from CSV file"""
try:
df = pd.read_csv(file_path)
return df
except FileNotFoundError:
return pd.DataFrame(
columns=["Id", "Name", "Current_Location", "Age", "Past_Travel_Destinations", "Number_of_Trips", "Flight_Number", "Departure_City", "Arrival_City", "Flight_Date"]
)
@tool
def search_flights(config: RunnableConfig, arrival_city: str, date: str = None) -> str:
"""
Use this tool to search for flights between two cities. It knows the user's current location
Args:
arrival_city (str): The city of arrival
date (str, optional): The date of the flight in YYYY-MM-DD format. If not provided, defaults to 7 days from now.
Returns:
str: A formatted string containing flight information including airline, departure time, arrival time, duration, and price for multiple flights.
"""
df = read_travel_data()
user_id = config.get("configurable", {}).get("user_id")
if user_id not in df["Id"].values:
return "User not found in the travel database."
user_data = df[df["Id"] == user_id].iloc[0]
current_location = user_data["Current_Location"]
departure_city = current_location.capitalize()
arrival_city = arrival_city.capitalize()
if date is None:
date = (datetime.now() + timedelta(days=7)).strftime("%Y-%m-%d")
# Generate mock flight data
num_flights = random.randint(2, 5)
airlines = ["AirEurope", "SkyWings", "TransContinental", "EuroJet", "GlobalAir"]
flights = []
for _ in range(num_flights):
airline = random.choice(airlines)
duration = timedelta(minutes=2)
price = random.randint(100, 400)
departure_time = datetime.strptime(date, "%Y-%m-%d") + timedelta(
hours=random.randint(0, 23), minutes=random.randint(0, 59)
)
arrival_time = departure_time + duration
flights.append(
{
"airline": airline,
"departure": departure_time.strftime("%H:%M"),
"arrival": arrival_time.strftime("%H:%M"),
"duration": str(duration),
"price": price,
}
)
# Format the results
import json
flight_data = {
"departure_city": departure_city,
"arrival_city": arrival_city,
"date": date,
"flights": []
}
for i, flight in enumerate(flights, 1):
flight_info = {
"flight_number": i,
"airline": flight['airline'],
"departure": flight['departure'],
"arrival": flight['arrival'],
"duration": str(flight['duration']),
"price": flight['price']
}
flight_data["flights"].append(flight_info)
return json.dumps(flight_data) + " FINISHED"
Flight Booking Retrieval Tool
The retrieve_flight_booking
function is a tool designed to fetch flight booking information from a database.
Purpose: This tool retrieves flight booking details based on a provided booking ID
Note: This function is designed to work with a specific database schema, assuming a 'flight_bookings' table exists with 'booking_id' as a field.
@tool
def retrieve_flight_booking(booking_id: int) -> str:
"""
Retrieve a flight booking by ID
Args:
booking_id (int): The unique identifier of the booking to retrieve
Returns:
str: A string containing the booking information if found, or a message indicating no booking was found
"""
booking = None
with closing(sqlite3.connect("data/travel_bookings.db", timeout=10.0)) as conn:
with closing(conn.cursor()) as cursor:
# Execute the query to retrieve the booking
cursor.execute("SELECT * FROM flight_bookings WHERE booking_id = ?", (booking_id,))
booking = cursor.fetchone()
# Close the connection
conn.close()
if booking:
return f"Booking found: {booking} FINISHED"
else:
return f"No booking found with ID: {booking_id} FINISHED"
Change Flight Booking Tool
The change_flight_booking
function is a tool designed to change flight booking information in the database.
Purpose: This function allows for changing the departure date of an existing flight booking.
Note: This function assumes the existence of a 'flight_bookings' table with 'booking_id' and 'departure_date' columns.
@tool
def change_flight_booking(booking_id: int, new_date: str) -> str:
"""
Change the date of a flight booking
Args:
booking_id (int): The unique identifier of the booking to be changed
new_date (str): The new date for the booking
Returns:
str: A message indicating the result of the booking change operation
"""
# conn = sqlite3.connect("data/travel_bookings.db")
# cursor = conn.cursor()
result = ""
with closing(sqlite3.connect("data/travel_bookings.db", timeout=10.0)) as conn:
with closing(conn.cursor()) as cursor:
# Execute the query to update the booking date
cursor.execute(
"UPDATE flight_bookings SET departure_date = ? WHERE booking_id = ?",
(new_date, booking_id),
)
conn.commit()
# Check if the booking was updated
if cursor.rowcount > 0:
result = f"Booking updated with ID: {booking_id}, new date: {new_date} FINISHED"
else:
result = f"No booking found with ID: {booking_id} FINISHED"
# Close the connection
conn.close()
return result
Flight Cancellation tool
The cancel_flight_booking
function is a tool designed to cancel flight bookings in the database.
Purpose: This function is designed to cancel an existing flight booking in the system.
@tool
def cancel_flight_booking(booking_id: int) -> str:
"""
Cancel a flight booking. If the task complete, reply with "FINISHED"
Args:
booking_id (str): The unique identifier of the booking to be cancelled
Returns:
str: A message indicating the result of the booking cancellation operation
"""
# conn = sqlite3.connect("data/travel_bookings.db")
# cursor = conn.cursor()
result = ""
with closing(sqlite3.connect("data/travel_bookings.db", timeout=10.0)) as conn:
with closing(conn.cursor()) as cursor:
cursor.execute("DELETE FROM flight_bookings WHERE booking_id = ?", (booking_id,))
conn.commit()
# Check if the booking was deleted
if cursor.rowcount > 0:
result = f"Booking canceled with ID: {booking_id} FINISHED"
else:
result = f"No booking found with ID: {booking_id} FINISHED"
# Close the connection
conn.close()
return result
Language Model
The LLM powering all of our agent implementations in this lab will be Claude 3 Sonnet via Amazon Bedrock. For easy access to the model we are going to use ChatBedrockConverse class of LangChain, which is a wrapper around Bedrock's Converse API.
from langchain_aws import ChatBedrockConverse
from langchain_aws import ChatBedrock
import boto3
# ---- ⚠️ Update region for your AWS setup ⚠️ ----
bedrock_client = boto3.client("bedrock-runtime", region_name="us-west-2")
llm = ChatBedrockConverse(
model="anthropic.claude-3-haiku-20240307-v1:0",
temperature=0,
max_tokens=None,
client=bedrock_client,
# other params...
)
Flight Agent setup
We are going to use create_react_agent
to create a flight agent.
We can customize the prompt using state_modifier
In our case, the flight agent uses this framework to: - Interpret user queries about flights - Decide which tool (search, retrieve, change, or cancel) to use - Execute the chosen tool and interpret the results - Formulate responses based on the tool outputs
import functools
import operator
from typing import Sequence, TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph import END, StateGraph, START
from langgraph.prebuilt import create_react_agent
from typing import Annotated
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
class State(TypedDict):
messages: Annotated[list, add_messages]
next: str
memory = MemorySaver()
flight_agent = create_react_agent(
llm,
tools=[
search_flights,
retrieve_flight_booking,
change_flight_booking,
cancel_flight_booking,
],
state_modifier="""
First gather all the information required to call a tool.
If you are not able to find the booking the do not try again and just reply with "FINISHED".
If tool has returned the results then reply with "FINISHED"
If all tasks are complete, reply with "FINISHED"
""",
checkpointer=memory,
)
from IPython.display import Image, display
display(Image(flight_agent.get_graph().draw_mermaid_png()))
Testing the Flight Agent
Let's put the flight agent to the test with a sample query.
config = {"configurable": {"thread_id": "127", "user_id":578}}
ret_messages = flight_agent.invoke({"messages": [("user", "Find flight to Amsterdam")]}, config)
ret_messages['messages'][-1].pretty_print()
#- un coment i you want to see the full orchesteration including the tool calling
#ret_messages
Hotel Agent
Just like flight agent we need to create few tools, which can manage hotel bookings. We will use the same approach as we did with flight agents.
The Hotel Agent will be responsible for handling various hotel-related tasks, including: 1. Suggesting hotels based on city and check-in date 2. Retrieving hotel booking details 3. Modifying existing hotel bookings 4. Cancelling hotel reservations
These functionalities will be implemented as separate tools, similar to the Flight Agent. The Hotel Agent will use these tools to interact with a simulated hotel booking system.
Suggest hotel tool
The suggest_hotels
function is a tool designed to suggest hotels based on city and check-in date. It takes in a city name (e.g., "New York") and a check-in date (e.g., 2019-08-30) as input, and returns a list of suggested hotel names.
Purpose: This tool simulates a hotel booking system that suggests hotels based on city and check-in date.
Note: This function is designed for demonstration and testing purposes, using randomly generated data rather than real information from hotel booking system.
@tool
def suggest_hotels(city: str, checkin_date: str) -> dict:
"""
Use this tool to search for hotels in these cities
Args:
city (str): The name of the city to search for hotels
checkin_date (str): The check-in date in YYYY-MM-DD format
Returns:
dict: A dictionary containing:
- hotels (list): List of hotel names in the specified city
- checkin_date (str): The provided check-in date
- checkout_date (str): A randomly generated checkout date
- price (int): A randomly generated price for the stay
"""
hotels = {
"New York": ["Hotel A", "Hotel B", "Hotel C"],
"Paris": ["Hotel D", "Hotel E", "Hotel F"],
"Tokyo": ["Hotel G", "Hotel H", "Hotel I"],
}
# Generate random checkout date and price
checkin = datetime.strptime(checkin_date, "%Y-%m-%d")
checkout = checkin + timedelta(days=random.randint(1, 10))
price = random.randint(100, 500)
hotel_list = hotels.get(city, ["No hotels found"])
return {
"hotels": hotel_list,
"checkin_date": checkin_date,
"checkout_date": checkout.strftime("%Y-%m-%d"),
"price": price,
}
Hotel Booking Retrieval Tool
The retrieve_hotel_booking
function is a tool designed to fetch hotel booking information from a database.
Purpose: This tool retrieves hotel booking details based on a provided booking ID
Note: This function is designed to work with a specific database schema, assuming a 'hotel_bookings' table exists with 'booking_id' as a field.
@tool
def retrieve_hotel_booking(booking_id: int) -> str:
"""
Retrieve a hotel booking by ID
Args:
booking_id (int): The unique identifier of the hotel booking to retrieve
Returns:
str: A string containing the hotel booking information if found, or a message indicating no booking was found
"""
# conn = sqlite3.connect("data/travel_bookings.db")
# cursor = conn.cursor()
booking = ""
with closing(sqlite3.connect("./data/travel_bookings.db", timeout=10.0)) as conn:
with closing(conn.cursor()) as cursor:
cursor.execute(f"SELECT * FROM hotel_bookings WHERE booking_id='{booking_id}'")
booking = cursor.fetchone()
# Close the connection
conn.close()
if booking:
return f"Booking found: {booking}"
else:
return f"No booking found with ID: {booking_id}"
Change Hotel Booking Tool
The change_hotel_booking
function is a tool designed to change hotel booking information in the database.
Purpose: This function allows for changing the new checkin and checkout date for the existing booking in the database.
Note: This function assumes the existence of a 'hotel_bookings' table with 'booking_id' and 'check_in_date' columns.
from datetime import datetime
@tool
def change_hotel_booking(booking_id: int, new_checkin_date: str = None, new_checkout_date: str = None) -> str:
"""
Change the dates of a hotel booking in the database. If the task completes, reply with "FINISHED"
Args:
booking_id (int): The unique identifier of the booking to be changed
new_checkin_date (str, optional): The new check-in date in YYYY-MM-DD format
new_checkout_date (str, optional): The new check-out date in YYYY-MM-DD format
Returns:
str: A message indicating the result of the booking change operation
"""
# conn = sqlite3.connect("data/travel_bookings.db") # Replace with your actual database file
# cursor = conn.cursor()
with closing(sqlite3.connect("./data/travel_bookings.db", timeout=10.0)) as conn:
with closing(conn.cursor()) as cursor:
try:
# First, fetch the current booking details
cursor.execute(
"""
SELECT * FROM hotel_bookings WHERE booking_id = ?
""",
(booking_id,),
)
booking = cursor.fetchone()
if booking is None:
return f"No hotel booking found with ID: {booking_id}"
# Unpack the booking details
( _, user_id,user_name,city,hotel_name,check_in_date,check_out_date,nights,price_per_night,total_price,num_guests,room_type,) = booking
# Update check-in and check-out dates if provided
if new_checkin_date:
check_in_date = new_checkin_date
if new_checkout_date:
check_out_date = new_checkout_date
# Recalculate nights and total price
checkin = datetime.strptime(check_in_date, "%Y-%m-%d")
checkout = datetime.strptime(check_out_date, "%Y-%m-%d")
nights = (checkout - checkin).days
total_price = nights * price_per_night
# Update the booking in the database
cursor.execute(
"""
UPDATE hotel_bookings
SET check_in_date = ?, check_out_date = ?, nights = ?, total_price = ?
WHERE booking_id = ?
""",
(check_in_date, check_out_date, nights, total_price, booking_id),
)
conn.commit()
return f"Hotel booking updated: Booking ID {booking_id}, New check-in: {check_in_date}, New check-out: {check_out_date}, Nights: {nights}, Total Price: {total_price} FINISHED"
except sqlite3.Error as e:
conn.rollback()
return f"An error occurred: {str(e)} Booking ID {booking_id}, New check-in: {check_in_date} FINISHED"
finally:
conn.close()
Hotel Cancellation tool
The cancel_hotel_booking
function is a tool designed to cancel hotel bookings in the database.
Purpose: This function is designed to cancel an existing hotel booking in the system.
@tool
def cancel_hotel_booking(booking_id: int) -> str:
"""
Cancel a hotel booking. If the task completes, reply with "FINISHED"
Args:
booking_id (str): The unique identifier of the booking to be cancelled
Returns:
str: A message indicating the result of the booking cancellation operation
"""
# conn = sqlite3.connect("data/travel_bookings.db")
# cursor = conn.cursor()
result=""
with closing(sqlite3.connect("data/travel_bookings.db", timeout=10.0)) as conn:
with closing(conn.cursor()) as cursor:
cursor.execute("DELETE FROM hotel_bookings WHERE booking_id = ?", (booking_id,))
conn.commit()
# Check if the booking was deleted
if cursor.rowcount > 0:
result = f"Booking canceled with ID: {booking_id} FINISHED"
else:
result = f"No booking found with ID: {booking_id} FINISHED"
# Close the connection
conn.close()
return result
Hotel agent
So far we have seen how to create agent using create_react_agent
class of LangGraph, which has simplified things for us. But we need human confirmation before changing hotel booking or before cancelling hotel booking. We need our agent to ask ask for confirmation before executing these tools. We will create a custom agent that can handle these things. We will create a separate node that can handel booking cancellation or modification. Agent execution will be interrupted at this node to get the confirmation.
So we will create 2 separate nodes search
and then for cancel
where we need Human-in-the-loop
. The below diagram illustrates this
Hotel Booking Assistant Setup
The HumanApprovalToolNode
class is a custom implementation for managing hotel bookings. It extends the functionality of a standard LangChain agent by adding the ability to ask for additional information from the user when needed.
The hotel_agent
function is set up to invoke the agent which will be a Return of Control
style which will ask the application to execute the tool. The tools bound to this agent themselves can either ToolNode or our custom HumanApprovalToolNode allowing us to easily execute this step
This code sets up a hotel booking assistant using LangChain components. It includes:
- We have 2 distinct set of tools set up a nodes on the graph 1/ which need Human confirmation like cancel booking and 2/ which do not need human intervention
hotel_agent
function that manages interactions with the language model. This is set up with complete knowledge of all the tools- prompt template for the primary assistant, focused on hotel bookings.
- set of hotel-related tools for tasks like suggesting hotels and managing bookings.
- runnable object that combines the prompt, language model, and tools.
The assistant can handle hotel inquiries, make suggestions, and perform booking operations. It also has the ability to ask for additional information when needed.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig
primary_assistant_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful assistant who manage hotel bookings"
),
("placeholder", "{messages}"),
]
)
hotel_tools = [
suggest_hotels,
retrieve_hotel_booking,
change_hotel_booking,
cancel_hotel_booking,
]
runnable_with_tools = primary_assistant_prompt | llm.bind_tools(
hotel_tools
)
def hotel_agent(state: State):
return {"messages": [runnable_with_tools.invoke(state)]}
Now we will create 2 separate nodes for search_and_retrieve_node
and change_and_cancel_node
, so that we can interrupt for human approval when change_and_cancel_node
is executed.
import json
from langchain_core.messages import ToolMessage
class HumanApprovalToolNode:
"""A node that runs the tools requested in the last AIMessage."""
def __init__(self, tools: list) -> None:
self.tools_by_name = {tool.name: tool for tool in tools}
def __call__(self, inputs: dict):
if messages := inputs.get("messages", []):
message = messages[-1]
else:
raise ValueError("No message found in input")
outputs = []
for tool_call in message.tool_calls:
user_input = input(
"Do you approve of the above actions? Type 'y' to continue;"
" otherwise, explain your requested changed.\n\n"
)
if user_input.lower() == "y":
tool_result = self.tools_by_name[tool_call["name"]].invoke(
tool_call["args"]
)
outputs.append(
ToolMessage(
content=json.dumps(tool_result),
name=tool_call["name"],
tool_call_id=tool_call["id"],
)
)
else:
outputs.append(
ToolMessage(
content=f"API call denied by user. Reasoning: '{user_input}'. ",
name=tool_call["name"],
tool_call_id=tool_call["id"],
)
)
return {"messages": outputs}
from langgraph.prebuilt import ToolNode, tools_condition
search_and_retrieve_node = ToolNode([suggest_hotels, retrieve_hotel_booking])
change_and_cancel_node = HumanApprovalToolNode(
[change_hotel_booking, cancel_hotel_booking]
)
We need to check which node is executed next. This function can check the state and decide which node to execute next of end the execution.
def should_continue(state):
messages = state["messages"]
last_message = messages[-1]
# If there is no function call, then we finish
if not last_message.tool_calls:
return "end"
elif last_message.tool_calls[0]["name"] in [
"change_hotel_booking",
"cancel_hotel_booking",
]:
return "human_approval"
# Otherwise if there is, we continue
else:
return "continue"
Assembling the Hotel Agent Graph
Let's add all the nodes in the graph and compile it to create our custom hotel agent.
This graph will define the flow of our hotel booking system, including:
- The main hotel agent node for processing requests
- A tool node for executing search and retrieve hotel booking
- Another tool node for cancelling and changing hotel booking
The graph will use conditional edges to determine the next step based on the current state, allowing for a dynamic and responsive workflow. We'll also set up memory management to maintain state across interactions.
from langgraph.graph import END, StateGraph, MessagesState
from IPython.display import Image, display
# Create a new graph workflow
hotel_workflow = StateGraph(MessagesState)
hotel_workflow.add_node("hotel_agent", hotel_agent)
hotel_workflow.add_node("search_and_retrieve_node", search_and_retrieve_node)
hotel_workflow.add_node("change_and_cancel_node", change_and_cancel_node)
hotel_workflow.add_edge(START, "hotel_agent")
# We now add a conditional edge
hotel_workflow.add_conditional_edges(
"hotel_agent",
# Next, we pass in the function that will determine which node is called next.
should_continue,
{
# If agent decides to use `suggest_hotels` or `retrieve_hotel_booking`
"continue": "search_and_retrieve_node",
# If agent decides to use `change_hotel_booking` or `cancel_hotel_booking`
"human_approval": "change_and_cancel_node",
"end": END,
},
)
hotel_workflow.add_edge("search_and_retrieve_node", "hotel_agent")
hotel_workflow.add_edge("change_and_cancel_node", "hotel_agent")
# Set up memory
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
hotel_graph_compiled = hotel_workflow.compile(
checkpointer=memory
)
display(Image(hotel_graph_compiled.get_graph().draw_mermaid_png()))
Testing the Custom Hotel Agent
Now we can test this agent
import uuid
from langchain_core.messages import ToolMessage
thread_id = str(uuid.uuid4())
_printed = set()
config = {"configurable": {"thread_id": thread_id}}
events = hotel_graph_compiled.stream(
{"messages": ("user", "Get details of my booking id 203")},
config,
stream_mode="values",
)
for event in events:
event["messages"][-1].pretty_print()
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
events = hotel_graph_compiled.stream(
{"messages": ("user", "cancel my hotel booking id 203")},
config,
stream_mode="values",
)
for event in events:
event["messages"][-1].pretty_print()
Supervisor agent
Now its time to create supervisor agent that will be in charge of deciding which child agent to call based on the user input and based on the conversation history.
The Supervisor Agent is responsible for: 1. Analyzing the conversation history and user input 2. Deciding which child agent (flight_agent or hotel_agent) to call next 3. Determining when to finish the conversation
We will create this agent with LangChain runnable chain created using supervisor prompt. We need to get the next_step
from the chain and we use with_structured_output
to return next step.
The Supervisor Agent routes tasks and maintains the overall flow of the conversation between the user and child agents.
from langchain_core.runnables import Runnable, RunnableConfig
from langgraph.graph.message import add_messages
from typing import Annotated, Sequence, List
from langchain_core.messages import HumanMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core import __version__ as core_version
from packaging import version
core_version = version.parse(core_version)
if (core_version.major, core_version.minor) < (0, 3):
from pydantic.v1 import BaseModel
else:
from pydantic import BaseModel
from typing import Literal
members = ["flight_agent", "hotel_agent"]
options = ["FINISH"] + members
class routeResponse(BaseModel):
"""
Return next agent name.
"""
next: Literal[*options]
class AskHuman(BaseModel):
"""Ask missing information from the user"""
question: str
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"""
Given the conversation below who should act next?
1. To search or cancel flight return 'flight_agent'
2. To search for hotel or cancel hotel booking return 'hotel_agent'
3. If you have the answer return 'FINISH'
4. When member has finished the task, and you notice FINISHED in the message then don't repeat same member again
5. Do not return next which is not related to user query. Example if user is asking about flight then do not call 'hotel_agent'
Or should we FINISH? ONLY return one of these {options}. Do not explain the process.
""",
),
("placeholder", "{messages}"),
]
).partial(options=str(options), members=", ".join(members))
supervisor_chain = prompt | llm.with_structured_output(routeResponse)
def supervisor_agent(state):
result = supervisor_chain.invoke(state)
output = {
"next": result.next,
"messages": [
HumanMessage(
content=f"Supervisor decided: {result.next}", name="supervisor"
)
],
}
print(f"Supervisor output: {output}")
return output
We can test our supervisor agent to check if it is returning correct next step based on the user input.
supervisor_agent({"messages": [("user", "I want to book a flight")]})
Assembling the Multi-Agent System
Now it's time to put all agents together in a workflow. We will start with the supervisor
.
class State(TypedDict):
messages: Annotated[list, add_messages]
next: str
full_workflow = StateGraph(State)
full_workflow.add_node("supervisor", supervisor_agent)
full_workflow.add_edge(START, "supervisor")
Creating the flight agent node
We need a helper function to take output from the flight search agent and add it to the messages list in the state.
from langchain_core.messages import AIMessage
def agent_node(state, agent, name):
result = agent.invoke(state)
return {
"messages": [HumanMessage(content=result["messages"][-1].content, name=name)]
}
flight_node = functools.partial(agent_node, agent=flight_agent, name="flight_agent")
Let's add this node to the workflow
full_workflow.add_node("flight_agent", flight_node)
Adding the Hotel Agent as a Subgraph
We can add hotel_agent
as subgraph to this workflow. This is is a good example of how to use subgraphs in workflows. This also give us more control over the workflow.
full_workflow.add_node("hotel_agent", hotel_graph_compiled)
Once we get the output from hotel agent we need to make sure that it has correct structure that supervisor agent can process. For this we need to add a node to the workflow that will process the output from hotel agent.
def process_output(state):
messages = state["messages"]
for message in reversed(messages):
if isinstance(message, AIMessage) and isinstance(message.content, str):
print(message.content)
return {
"messages": [
HumanMessage(content=message.content, name="hotel_agent")
]
}
return None
full_workflow.add_node("process_output", process_output)
Connecting the Agents
Now we can add edges to the workflow that will connect all the agents. We need to add an edge from the flight agent to the supervisor, and from the hotel agent to the process output node and then to the supervisor.
- We'll connect the
flight_agent
directly to thesupervisor
. - For the
hotel_agent
, we'll first connect it to theprocess_output
node, which will format its output. - The
process_output
node will then be connected to thesupervisor
. - We'll set up conditional edges from the supervisor to all other agents (including itself) and to a FINISH state.
full_workflow.add_edge("flight_agent", "supervisor")
full_workflow.add_edge("hotel_agent", "process_output")
full_workflow.add_edge("process_output", "supervisor")
conditional_map = {k: k for k in members}
conditional_map["FINISH"] = END
full_workflow.add_conditional_edges("supervisor", lambda x: x["next"], conditional_map)
from IPython.display import Image, display
supervisor_agent_graph = full_workflow.compile(
checkpointer=memory,
)
# display subgraph using xray=1
display(Image(supervisor_agent_graph.get_graph(xray=1).draw_mermaid_png()))
We need to create a utility function to extract tool id from the subgraph.
def extract_tool_id(pregel_task):
# Navigate to the messages in the state
messages = pregel_task.state.values.get("messages", [])
# Find the last AIMessage
for message in reversed(messages):
if isinstance(message, AIMessage):
# Check if the message has tool_calls
tool_calls = getattr(message, "tool_calls", None)
if tool_calls:
# Return the id of the first tool call
return tool_calls[0]["id"]
Testing full graph
Now we are ready to test the graph. We will create a unique thread_id to manage memory. We have few sample questions to test the graph.
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
events = supervisor_agent_graph.stream(
{"messages": ("user", "Get details of my flight booking id 200")},
config,
stream_mode="values",
subgraphs=True,
)
for event in events:
event[1]["messages"][-1].pretty_print()
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
events = supervisor_agent_graph.stream(
{"messages": ("user", "cancel my hotel booking id 193")},
config,
stream_mode="values",
subgraphs=True,
)
for event in events:
event[1]["messages"][-1].pretty_print()
in case you need to see the contents of the hotel database
hotel database
import sqlite3
from contextlib import closing
with closing(sqlite3.connect("data/travel_bookings.db", timeout=10.0)) as conn:
with closing(conn.cursor()) as cursor:
cursor.execute("""SELECT * FROM hotel_bookings""")
#cursor.execute("""SELECT * FROM hotel_bookings where booking_id='203'""")
for idx in range(5):
print(cursor.fetchone())
conn.close()
flights database
import sqlite3
from contextlib import closing
with closing(sqlite3.connect("data/travel_bookings.db", timeout=10.0)) as conn:
with closing(conn.cursor()) as cursor:
cursor.execute("""SELECT * FROM flight_bookings""")
#cursor.execute("""SELECT * FROM hotel_bookings where booking_id='203'""")
for idx in range(5):
print(cursor.fetchone())
conn.close()
Conclusion
In this lab we have seen implementation of a multi-agent system for travel booking using LangGraph. The implementation showcases several key concepts:
-
Multi-Agent Architecture: The system effectively divides responsibilities between specialized agents (Flight and Hotel agents) coordinated by a Supervisor agent, demonstrating how complex tasks can be broken down into manageable components
-
Subgraph Pattern: The Hotel agent implementation as a subgraph shows how complex agent behaviors can be encapsulated and managed independently, while still integrating seamlessly with the larger system
-
Tool Integration: Each agent has access to specific tools relevant to its domain, showing how specialized capabilities can be effectively distributed across different agents
-
Supervisor Pattern: The supervisor agent demonstrates effective orchestration of multiple specialized agents, making decisions about task routing and completion
-
State Management: The implementation shows how to manage state across multiple agents and handle complex interactions between them
Key benefits of this approach include: - Improved modularity and maintainability - Better separation of concerns - Efficient handling of domain-specific tasks - Flexible architecture that can be extended with additional agents
This pattern can be adapted for various complex applications where multiple specialized agents need to work together to accomplish a larger goal.