LangGraph persistence
LangGraph persistence
关于langgraph的持久化学习记录
checkpoint
首先compile a graph with a checkpointer(简写成ckpter吧),这个ckpter会在每个步骤去保存一个ckpt,ckpts都是存放到一个线程thread里面。
StateSnapshot对象保存每个步骤的图状态
thread
调用graph就需要指定线程id
{"configurable": {"thread_id": "1"}}
示例代码
主要通过代码看下可以实现的功能吧
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from typing import Annotated
from typing_extensions import TypedDict
from operator import add
class State(TypedDict):
foo: str
bar: Annotated[list[str], add] # 这里加上了reducer函数
def node_a(state: State):
return {"foo": "a", "bar": ["a"]}
def node_b(state: State):
return {"foo": "b", "bar": ["b"]}
workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)
checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)
# 先指定线程id
config = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)
# get the latest state snapshot
config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)
# get a state snapshot for a specific checkpoint_id
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
graph.get_state(config)
# 拿全部的state记录
config = {"configurable": {"thread_id": "1"}}
list(graph.get_state_history(config))
# play-back 回放
config = {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
graph.invoke(None, config=config)
# 更新某个节点的参数
# update_state(config,values,as_node)
# config如果只传thread_id则更新当前的state,加上ckpt会fork选择的检查点
# value就是state的值,这里的就是有一点要注意会根据参数的reducer去覆盖或者追加
graph.update_state(config, {"foo": 2, "bar": ["c"]})
# 这里我更新完state就变成了{'foo': 2, 'bar': ['a', 'b', 'c']}
# as_node的用法,指定node名称在node节点更新数据
graph.update_state(config, {"foo": 2, "bar": ["d"]}, as_node="node_a")
graph.get_state(config)
memory
因为ckpter不能跨线程,所以来个memory存储
from langgraph.store.memory import InMemoryStore
import uuid
store = InMemoryStore()
user_id = "qingchen"
namespace_for_memory = (user_id, "memories")
memory_id = str(uuid.uuid4())
memory = {"fans": "jay chou"}
store.put(namespace_for_memory, memory_id, memory)
memories = store.search(namespace_for_memory)
memories[-1]
语义搜索
# 语义搜索
from langchain_qingchen.data.embedding.QingchenEmbeddings import QingchenEmbeddings
store = InMemoryStore(
index={
"embed": QingchenEmbeddings(), # Embedding provider
"dims": 1024, # Embedding dimensions
"fields": ["fans", "$"] # Fields to embed
}
)
user_id = "qingchen"
namespace_for_memory = (user_id, "memories")
memory_id = str(uuid.uuid4())
memory = {"fans": "I'm jay chou's fans"}
store.put(namespace_for_memory, memory_id, memory)
# Find memories about jay chou
# (This can be done after putting memories into the store)
memories = store.search(
namespace_for_memory,
query="Whose fan is it?",
limit=3 # Return top 3 matches
)
memories