Langgraph学习1:基础流程

最近开始学习Langgraph,决定从基础的chatbot开始,一步步记录一下学习过程

1. LLM免费API调用方式

开始构建chatbot的第一步是选择合适的大语言模型。为了节省费用,这里尝试了两种不同平台的的免费LLM API的调用方式:百度千帆和硅基流动。

百度千帆调用

这是demo中目前使用的方式,主要通过QianfanChatEndpoint来调用百度的ERNIE-Speed-128K免费模型:

import os
from langchain_community.chat_models import QianfanChatEndpoint

llm = QianfanChatEndpoint(
    model="ERNIE-Speed-128K",
    streaming=True,  # 启用流式输出
    api_key=os.getenv('QIANFAN_AK', ''),
    secret_key=os.getenv('QIANFAN_SK', '')
)

硅基流动调用方式

另一种方式是使用硅基流动的API,通过ChatOpenAI来调用国产的GLM或Qwen等免费模型:

from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
    #THUDM/glm-4-9b-chat
    #Qwen/Qwen2.5-7B-Instruct
    model="THUDM/glm-4-9b-chat",
    streaming=False,
    api_key=os.getenv('SILICONFLOW_API_KEY', ''), 
    base_url=os.getenv('SILICONFLOW_BASE_URL', ''),
    temperature=0.1,
)

2. 构建基础Langgraph工作流

接下来定义一个简单的chat节点函数和工作流:

from datetime import datetime
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, AIMessageChunk
from langgraph.graph import Graph, StateGraph, MessagesState, END
import asyncio
from langchain_core.runnables.graph import MermaidDrawMethod

# llm的调用
async def chat_bot(state: MessagesState):
    """生成流式回复的节点函数"""
    messages = state["messages"]
    # 使用非流式方式接收完整返回
    response = await llm.ainvoke(messages)
    return {"messages": [response]}

# 创建工作流程
workflow = StateGraph(MessagesState)
# 添加节点
workflow.add_node("chat_bot", chat_bot)
# 设置入口节点
workflow.set_entry_point("chat_bot")
# 添加边,从chat_bot节点到end节点
workflow.add_edge("chat_bot", END)
# 编译图
app_graph = workflow.compile()

在这个工作流中,只设置了一个chat_bot节点,它接收消息并返回LLM的回复。工作流的结构很简单:入口 -> chat_bot -> 结束。

3. 两种流式输出方式

在Langgraph中,有多种调试和输出方式,这里尝试了两种不同的流式输出方式:

async def run_streaming_chain():
    """运行graph的链"""
    print("开始生成回复...\n")
    messages = [
        SystemMessage(content="你是一个智能助手,使用专业且准确的语言回复用户的问题,且使用中文进行回复"),
        HumanMessage(content="什么花最丑")
    ]

    # 初始化状态
    initial_state = {"messages": messages, "streamed_output": []}

    # stream_mode values的效果
    async for event in app_graph.astream(initial_state, config={"configurable": {"thread_id": "1"}}, stream_mode="values"):
        if "messages" in event:
            event["messages"][-1].pretty_print()
        pass

这种方式会返回完整的消息对象,适合需要处理完整状态更新的场景。

第二种:stream_mode=”messages”

这种方式可以获取真正的流式输出片段:

# stream_mode messages的流式效果
    async for event in app_graph.astream(initial_state, stream_mode='messages'):
        # print('event------>',event,'\n\n')
        if isinstance(event, tuple):
            chunk: AIMessageChunk = event[0]
            if chunk.type == 'AIMessageChunk':
                print('event里监听到的流式输出------>',chunk.content,'\n\n')

使用messages模式可以获取每个小片段的输出,适合需要实时展示生成内容的场景,比如打字机效果。

4.使用MermaidDrawMethod可视化工作流

最后,将构建的Langgraph工作流可视化出来,便于理解和记录:

# 定义一个将图导出为PNG的函数
def export_graph_to_png():
    """
    将LangGraph图导出为PNG格式
    Returns:
        str: 生成的PNG文件路径
    """
    try:
        output_file='简单的chatbot-'+datetime.now().strftime("%Y-%m-%d_%H-%M-%S")+".png"
        app_graph.get_graph().draw_mermaid_png(
            draw_method=MermaidDrawMethod.API,
            output_file_path=output_file
        )
    except Exception as e:
        print(f"导出PNG图形时出错: {e}")
        return None

在这个函数中,使用MermaidDrawMethod.API方法生成了工作流的可视化图片。Langgraph支持多种可视化方式,API方式是其中比较常用的一种,它会调用Mermaid的在线服务生成图片。生成的图片名称会根据以上代码逻辑以包含时间戳,便于区分不同批次运行后的结果。

完整代码如下:

import os
from datetime import datetime
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, AIMessageChunk
from langgraph.graph import Graph, StateGraph, MessagesState, END
from langchain_community.chat_models import QianfanChatEndpoint
import asyncio
from langchain_core.runnables.graph import MermaidDrawMethod

# 百度千帆的调用方式
llm = QianfanChatEndpoint(
    model="ERNIE-Speed-128K",
    streaming=True,  # 启用流式输出
    api_key=os.getenv('QIANFAN_AK', ''),
    secret_key=os.getenv('QIANFAN_SK', '')
)

# 硅基流动的api调用方式
# llm = ChatOpenAI(
#     #THUDM/glm-4-9b-chat
#     #Qwen/Qwen2.5-7B-Instruct
#     model="THUDM/glm-4-9b-chat",
#     streaming=False,  # 启用流式输出
#     api_key=os.getenv('SILICONFLOW_API_KEY', ''), 
#     base_url=os.getenv('SILICONFLOW_BASE_URL', ''),
#     temperature=0.1,
# )

# llm的调用
async def chat_bot(state: MessagesState):
    """生成流式回复的节点函数"""
    messages = state["messages"]
   # 使用非流式方式接收完整返回
    response = await llm.ainvoke(messages)
    return {"messages": [response]}

# 创建工作流程
workflow = StateGraph(MessagesState)
# 添加节点
workflow.add_node("chat_bot", chat_bot)
# 设置入口节点
workflow.set_entry_point("chat_bot")
# 添加边,从chat_bot节点到end节点
workflow.add_edge("chat_bot", END)
# 编译图
app_graph = workflow.compile()

# 定义一个将图导出为PNG的函数
def export_graph_to_png():
    """
    将LangGraph图导出为PNG格式
    Returns:
        str: 生成的PNG文件路径
    """
    try:
        output_file='简单的chatbot-'+datetime.now().strftime("%Y-%m-%d_%H-%M-%S")+".png"
        app_graph.get_graph().draw_mermaid_png(
            draw_method=MermaidDrawMethod.API,
            output_file_path=output_file
        )
        # return True
    except Exception as e:
        print(f"导出PNG图形时出错: {e}")
        return None

# 测试运行函数
async def run_streaming_chain():
    """运行graph的链"""
    print("开始生成回复...\n")
    messages = [
        SystemMessage(content="你是一个智能助手,使用专业且准确的语言回复用户的问题,且使用中文进行回复"),
        HumanMessage(content="什么花最丑")
    ]

    # 初始化状态
    initial_state = {"messages": messages, "streamed_output": []}

    # stream_mode values的效果
    async for event in app_graph.astream(initial_state, config={"configurable": {"thread_id": "1"}}, stream_mode="values"):
        # print('event------>',event,'\n\n')
        if "messages" in event:
            event["messages"][-1].pretty_print()
        pass

    # stream_mode messages的流式效果
    # async for event in app_graph.astream(initial_state, stream_mode='messages'):
    #     # print('event------>',event,'\n\n')
    #     if isinstance(event, tuple):
    #         chunk: AIMessageChunk = event[0]
    #         if chunk.type == 'AIMessageChunk':
    #             print('event里监听到的流式输出------>',chunk.content,'\n\n')

    # print("\n回复完成")

    # 展示图形
    try:
        # 导出为PNG
        export_graph_to_png()
    except Exception as e:
        print(f"图表绘制出错: {e}")

# 运行流式输出
if __name__ == "__main__":
    asyncio.run(run_streaming_chain())