Langchain

PromptTemplate

提示词模板的实例化方式

from langchain_core.prompts import PromptTemplate

# Instantiation using from_template (recommended)
prompt = PromptTemplate.from_template("Say {foo}")
prompt.format(foo="bar")

# Instantiation using initializer
prompt = PromptTemplate(template="Say {foo}")

print(prompt.get_input_schema())
# <class 'langchain_core.utils.pydantic.PromptInput'>

example_prompt = PromptTemplate.from_examples(
    ["hello,world {user}!", "hello,python {user}!"],
    input_variables=["user"],
    example_separator="\n",
    suffix="hello",
    prefix="FUCK"
)

print(example_prompt.format(user="qingchen"))
# FUCK
# hello,world qingchen!
# hello,python qingchen!
# hello

template = PromptTemplate.from_template(
    "hhh{p1}{p2}{p3}",
    partial_variables={"p1": "x", },
)

QingchenModel

自己封装了的model方便切换模型

from langchain_openai import ChatOpenAI
from llm_env import get_env


class QingchenModel(ChatOpenAI):
    def __init__(
            self,
            cache=None,
            temperature=0.9,
            verbose=False,
            streaming=False,
            # api_key=get_env.load_env_ali_api_key(),
            # model='qwen-max-latest',
            # base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
            model='moonshot-v1-8k',
            api_key=get_env.load_env_moonshot_api_key(),
            base_url="https://api.moonshot.cn/v1",
            **kwargs
    ):
        ChatOpenAI.__init__(
            self,
            verbose=verbose,
            temperature=temperature,
            streaming=streaming,
            # api_key=get_env.load_env_glm_api_key(),
            api_key=api_key,
            model=model,
            base_url=base_url,
            cache=cache,
            **kwargs
        )

RunnableAssign

就是把输入的dict和返回的结果合到一起

# This is a RunnableAssign
from typing import Dict
from langchain_core.runnables.passthrough import (
    RunnableAssign,
    RunnableParallel,
)
from langchain_core.runnables.base import RunnableLambda
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter

from langchain_qingchen.chat.QingchenModel import QingchenModel


def add_ten(x: Dict[str, int]) -> Dict[str, int]:
    return {"added": x["input"] + 10}


mapper = RunnableParallel(
    {"add_step": RunnableLambda(add_ten)}
)
print(mapper.invoke({"input": 5}))
# {'add_step': {'added': 15}}

runnable_assign = RunnableAssign(mapper)

# Synchronous example
print(runnable_assign.invoke({"input": 5}))
# returns {'input': 5, 'add_step': {'added': 15}}

# Asynchronous example
# await runnable_assign.ainvoke({"input": 5})
# returns {'input': 5, 'add_step': {'added': 15}}


prompt = (
        SystemMessagePromptTemplate.from_template("You are a nice assistant.")
        + "{question}"
)
print(prompt)
# llm = FakeStreamingListLLM(responses=["foo-lish"])
llm = QingchenModel()

chain: Runnable = prompt | llm | {"str": StrOutputParser()}

print(chain.invoke({"question": "顺便生成一个笑话"}))

chain_with_assign = chain.assign(hello=itemgetter("str") | llm)
# 这里是这样的输入是{"question": "xxx"} -> {"str": "llm response"} -> (assgin会把输入也带下来)变成{"str": "llm response1","hello":"llm response2"}

# print(chain_with_assign.input_schema.model_json_schema())

# print(chain_with_assign.output_schema.model_json_schema())

response = chain_with_assign.invoke({"question": "顺便生成一个笑话"})
print(response)


# {'add_step': {'added': 15}}
# {'input': 5, 'add_step': {'added': 15}}
# input_variables=['question'] input_types={} partial_variables={} messages=[SystemMessagePromptTemplate(prompt=PromptTemplate(input_variables=[], input_types={}, partial_variables={}, template='You are a nice assistant.'), additional_kwargs={}), HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['question'], input_types={}, partial_variables={}, template='{question}'), additional_kwargs={})]
# {'str': '当然可以,这里有一个幽默的笑话供您欣赏:\n\n有一天,一只狗走进了一个电脑商店,对售货员说:“给我来两台电脑,一台给我,另一台给我的猫。”\n\n售货员惊讶地问:“猫也需要电脑吗?”\n\n狗回答说:“是的,我的猫非常聪明,它可以一边上网冲浪,一边砸掉所有不喜欢它的老鼠。”\n\n希望这个笑话能让您开心一笑!'}
# {'str': '当然可以,这里有一个轻松幽默的笑话:\n\n有一天,一位电脑程序员去找他的医生并抱怨说:“医生,我觉得我得了抑郁症。我真的很不喜欢我的工作。”\n医生回答说:“为什么不尝试换个工作呢?比如,你可以尝试去动物园工作。”\n程序员疑惑地问:“但是我对动物一无所知啊。”\n医生笑着说:“没关系,你可以当一个人类的主键(Primary Key)。”\n\n笑话解释:在这个笑话中,“主键”是数据库术语,指的是一个能够区分数据库中每个记录的唯一标识符。这里医生用了一个双关语,暗示程序员虽然是人,但对动物知之甚少,所以在动物园里就像是一个用来区分不同人类的唯一标识符,即“人类的主键”。', 'hello': AIMessage(content='哈哈,这个笑话确实很有趣,它巧妙地结合了程序员的专业术语和日常生活,让人会心一笑。如果你想了解更多这样的笑话,或者有其他问题需要帮助,随时告诉我!', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 39, 'prompt_tokens': 157, 'total_tokens': 196, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'moonshot-v1-8k', 'system_fingerprint': None, 'id': 'chatcmpl-689232111e91f778fae383a1', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--d8b92cf9-9299-40cf-8b94-0c3655d9fd0e-0', usage_metadata={'input_tokens': 157, 'output_tokens': 39, 'total_tokens': 196, 'input_token_details': {}, 'output_token_details': {}})}

RunnableGenerator

接收一个迭代器,然后可以流式返回

# @Author  : ljl
# @Time    : 2025/1/17 下午2:44
import asyncio
from typing import Any, AsyncIterator, Iterator
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableGenerator, RunnableLambda
from langchain_core.output_parsers import StrOutputParser

from langchain_qingchen.chat.QingchenModel import QingchenModel


def gen(input: Iterator[Any]) -> Iterator[str]:
    for token in ["Have", " a", " nice", " day"]:
        yield token


runnable = RunnableGenerator(gen)
print(runnable.invoke(None))
print(list(runnable.stream(None)))  # ["Have", " a", " nice", " day"]
print(runnable.batch([None, None]))  # ['Have a nice day', 'Have a nice day']


# Async version:
async def agen(input: AsyncIterator[Any]) -> AsyncIterator[str]:
    for token in ["Have", " a", " nice", " day"]:
        yield token


async def main():
    runnable = RunnableGenerator(agen)
    result = await runnable.ainvoke(None)
    print(result)  # "Have a nice day"
    [print(p) async for p in runnable.astream(None)]  # ["Have", " a", " nice", " day"]


# 运行异步主函数
asyncio.run(main())

model = QingchenModel()
chant_chain = (
        ChatPromptTemplate.from_template("Give me a 3 word chant about {topic}")
        | model
        | StrOutputParser()
)


def character_generator(input: Iterator[str]) -> Iterator[str]:
    for token in input:
        if "," in token or "." in token:
            yield "👏" + token
        else:
            yield token


runnable = chant_chain | character_generator
assert type(runnable.last) is RunnableGenerator # 这里会自动封装成RunnableGenerator
print("".join(runnable.stream({"topic": "waste"})))


# Note that RunnableLambda can be used to delay streaming of one step in a
# sequence until the previous step is finished:
# RunnableLambda可用于延迟序列中某一步骤的流式处理,直到前一步骤完成
def reverse_generator(input: str) -> Iterator[str]:
    # Yield characters of input in reverse order.
    for character in input[::-1]:
        yield character


runnable = chant_chain | RunnableLambda(reverse_generator)
print("".join(runnable.stream({"topic": "waste"})))

# Have a nice day
# ['Have', ' a', ' nice', ' day']
# ['Have a nice day', 'Have a nice day']
# Have a nice day
# Have
#  a
#  nice
#  day
# Reduce👏, Reuse👏, Recycle
# elcyceR ,esueR ,ecudeR

RunnableLambda

我平时写用的很多,可以写一个方法用RunnableLambda包一下就能直接用上

from langchain_core.runnables import RunnableLambda
from langchain_core.globals import set_debug
from langchain_core.tracers import ConsoleCallbackHandler
import random

set_debug(True)


def add_one(x: int) -> int:
    return x + 1


def buggy_double(y: int) -> int:
    """Buggy code that will fail 70% of the time"""
    if random.random() > 0.3:
        print('This code failed, and will probably be retried!')  # noqa: T201
        raise ValueError('Triggered buggy code')
    return y * 2


sequence = (
        RunnableLambda(add_one) |
        RunnableLambda(buggy_double).with_retry(  # Retry on failure
            stop_after_attempt=10,
            wait_exponential_jitter=False
        )
)

print(sequence.input_schema.model_json_schema())  # Show inferred input schema
print(sequence.output_schema.model_json_schema())  # Show inferred output schema
# print(sequence.invoke(2, config={'callbacks': [ConsoleCallbackHandler()]}))  # invoke the sequence (note the retry above!!)
print(sequence.invoke(2))  # invoke the sequence (note the retry above!!)

# {'title': 'add_one_input', 'type': 'integer'}
# {'title': 'buggy_double_output', 'type': 'integer'}
# [chain/start] [chain:RunnableSequence] Entering Chain run with input:
# {
#   "input": 2
# }
# [chain/start] [chain:RunnableSequence > chain:add_one] Entering Chain run with input:
# {
#   "input": 2
# }
# [chain/end] [chain:RunnableSequence > chain:add_one] [0ms] Exiting Chain run with output:
# {
#   "output": 3
# }
# [chain/start] [chain:RunnableSequence > chain:buggy_double] Entering Chain run with input:
# {
#   "input": 3
# }
# [chain/start] [chain:RunnableSequence > chain:buggy_double > chain:buggy_double] Entering Chain run with input:
# {
#   "input": 3
# }
# [chain/end] [chain:RunnableSequence > chain:buggy_double > chain:buggy_double] [0ms] Exiting Chain run with output:
# {
#   "output": 6
# }
# [chain/end] [chain:RunnableSequence > chain:buggy_double] [1ms] Exiting Chain run with output:
# {
#   "output": 6
# }
# [chain/end] [chain:RunnableSequence] [2ms] Exiting Chain run with output:
# {
#   "output": 6
# }
# 6

RunnableParallel

并行

from langchain_core.runnables import RunnableLambda, RunnableParallel
from langchain_core.prompts import ChatPromptTemplate

from langchain_qingchen.chat.QingchenModel import QingchenModel


def add_one(x: int) -> int:
    return x + 1


def mul_two(x: int) -> int:
    return x * 2


def mul_three(x: int) -> int:
    return x * 3


runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
runnable_3 = RunnableLambda(mul_three)

sequence = runnable_1 | {  # this dict is coerced to a RunnableParallel
    "mul_two": runnable_2,
    "mul_three": runnable_3,
}
# Or equivalently:
# sequence = runnable_1 | RunnableParallel(
#     {"mul_two": runnable_2, "mul_three": runnable_3}
# )
# Also equivalently:
# sequence = runnable_1 | RunnableParallel(
#     mul_two=runnable_2,
#     mul_three=runnable_3,
# )

print(sequence.invoke(1))
# await sequence.ainvoke(1)

print(sequence.batch([1, 2, 3]))
# await sequence.abatch([1, 2, 3])


model = QingchenModel()
joke_chain = (
        ChatPromptTemplate.from_template("给我讲个关于{topic}的笑话")
        | model
)
poem_chain = (
        ChatPromptTemplate.from_template("写一首关于{topic}的两行诗")
        | model
)

runnable = RunnableParallel(joke=joke_chain, poem=poem_chain)

# Display stream
# output = {key: "" for key, _ in runnable.output_schema()}
for chunk in runnable.stream({"topic": "泳衣"}):
    for key in chunk:
        print(chunk)  # noqa: T201

# {'mul_two': 4, 'mul_three': 6}
# [{'mul_two': 4, 'mul_three': 6}, {'mul_two': 6, 'mul_three': 9}, {'mul_two': 8, 'mul_three': 12}]
# {'joke': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='好的', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content=',', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='这里', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='有一个', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='关于', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='泳', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='衣', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='的', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='轻松', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='幽默', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='的', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='笑话', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content=':\n\n', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='有一天', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content=',', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='一位', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='顾客', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='走进', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='泳', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='衣', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='店', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='对', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='店员', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='说', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content=':“', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='我想', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='找', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='一套', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='既', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='时尚', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='又', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='实用的', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='泳', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='衣', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='。', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='”\n', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='店员', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='回答', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content=':“', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='当然', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content=',', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='我们', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='这里有', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='各种各样的', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='泳', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='衣', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='。', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='”\n', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='顾客', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='说', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content=':“', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='我想要', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='一款', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='即使', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='在水中', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='也不会', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='变得', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='透明的', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='泳', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='衣', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='。', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='”\n', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='店员', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='想了想', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content=',', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='然后', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='微笑着', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='说', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content=':“', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='那', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='您', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='可能', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='需要', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='一款', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='潜水', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='服', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='。”\n\n', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='这个', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='笑话', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='以', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='泳', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='衣', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='的材料', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='和', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='功能', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='为', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='切入点', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content=',', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='用', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='一个', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='幽默', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='的', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='转折', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='来', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='达到', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='幽默', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='效果', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='。', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='希望', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='这个', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='笑话', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='能', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='让您', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='会', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='心', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='一笑', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='!', additional_kwargs={}, response_metadata={}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'joke': AIMessageChunk(content='', additional_kwargs={}, response_metadata={'finish_reason': 'stop', 'model_name': 'moonshot-v1-8k', 'system_fingerprint': 'fpv0_ff52a3ef'}, id='run--b900033d-941d-426c-8359-fda5f825cbf9')}
# {'poem': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run--a998b997-aca5-4de9-9281-dd63d96d89ad')}
# {'poem': AIMessageChunk(content='水中', additional_kwargs={}, response_metadata={}, id='run--a998b997-aca5-4de9-9281-dd63d96d89ad')}
# {'poem': AIMessageChunk(content='舞', additional_kwargs={}, response_metadata={}, id='run--a998b997-aca5-4de9-9281-dd63d96d89ad')}
# {'poem': AIMessageChunk(content='者', additional_kwargs={}, response_metadata={}, id='run--a998b997-aca5-4de9-9281-dd63d96d89ad')}
# {'poem': AIMessageChunk(content='披', additional_kwargs={}, response_metadata={}, id='run--a998b997-aca5-4de9-9281-dd63d96d89ad')}
# {'poem': AIMessageChunk(content='轻', additional_kwargs={}, response_metadata={}, id='run--a998b997-aca5-4de9-9281-dd63d96d89ad')}
# {'poem': AIMessageChunk(content='纱', additional_kwargs={}, response_metadata={}, id='run--a998b997-aca5-4de9-9281-dd63d96d89ad')}
# {'poem': AIMessageChunk(content=',\n', additional_kwargs={}, response_metadata={}, id='run--a998b997-aca5-4de9-9281-dd63d96d89ad')}
# {'poem': AIMessageChunk(content='碧', additional_kwargs={}, response_metadata={}, id='run--a998b997-aca5-4de9-9281-dd63d96d89ad')}
# {'poem': AIMessageChunk(content='波', additional_kwargs={}, response_metadata={}, id='run--a998b997-aca5-4de9-9281-dd63d96d89ad')}
# {'poem': AIMessageChunk(content='映', additional_kwargs={}, response_metadata={}, id='run--a998b997-aca5-4de9-9281-dd63d96d89ad')}
# {'poem': AIMessageChunk(content='日', additional_kwargs={}, response_metadata={}, id='run--a998b997-aca5-4de9-9281-dd63d96d89ad')}
# {'poem': AIMessageChunk(content='泳', additional_kwargs={}, response_metadata={}, id='run--a998b997-aca5-4de9-9281-dd63d96d89ad')}
# {'poem': AIMessageChunk(content='衣', additional_kwargs={}, response_metadata={}, id='run--a998b997-aca5-4de9-9281-dd63d96d89ad')}
# {'poem': AIMessageChunk(content='华', additional_kwargs={}, response_metadata={}, id='run--a998b997-aca5-4de9-9281-dd63d96d89ad')}
# {'poem': AIMessageChunk(content='。', additional_kwargs={}, response_metadata={}, id='run--a998b997-aca5-4de9-9281-dd63d96d89ad')}
# {'poem': AIMessageChunk(content='', additional_kwargs={}, response_metadata={'finish_reason': 'stop', 'model_name': 'moonshot-v1-8k', 'system_fingerprint': 'fpv0_ff52a3ef'}, id='run--a998b997-aca5-4de9-9281-dd63d96d89ad')}

RunnablePassthrough

这个用的也蛮多的

经常这么写:

RunnablePassthrough.assign(test=lambda x: len(x))
# test后面是个runnable的
# RunnablePassthrough会把前面字典带进来,assign把结果加进去,也就是说可以把第一步的模型返回带到第二步最终一起返回
import time

from langchain.globals import set_debug
from langchain_core.runnables import (
    RunnableLambda,
    RunnableParallel,
    RunnablePassthrough,
)

from langchain_qingchen.chat.QingchenModel import QingchenModel


# set_debug(True)
runnable = RunnableParallel(
    origin=RunnablePassthrough(),
    modified=lambda x: x + 1
)

print(runnable.invoke(1))


def fake_llm(prompt: str) -> str:  # Fake LLM for the example
    return "completion"


chain = RunnableLambda(fake_llm) | {
    'original': RunnablePassthrough(),  # Original LLM output
    'parsed': lambda text: text[::-1]  # Parsing logic
}

print(chain.invoke('hello'))

runnable = {
               'llm1': fake_llm,
               'llm2': fake_llm,
           } | RunnablePassthrough.assign(
    total_chars=lambda inputs: len(inputs['llm1'] + inputs['llm2'])
)

print(runnable.invoke('hello'))
# {'llm1': 'completion', 'llm2': 'completion', 'total_chars': 20}


# def time_wait(input):
#     print("sleeping")
#     time.sleep(2)
#
#
# def print_res(inputs):
#     print(inputs)
#
#
# chain2 = QingchenModel() | RunnablePassthrough(time_wait) | RunnablePassthrough(print_res)
# # print(chain2.invoke('你好'))
#
# messages = [
#     ("human", "开始"),
#     ("ai", "Hello, I am Bulbasaur. What's your name?"),
#     ("human", "Hello. My name is Pikachu."),
#     ("ai", "Nice to meet you, Pikachu."),
#     ("human", "What are you doing?"),
#     ("ai", "I'm exploring this area. What about you?"),
#     ("human", "I'm learning English")
# ]
# # print(chain2.invoke(messages))
# for e in chain2.stream(messages):
#     print(e)

# {'origin': 1, 'modified': 2}
# {'original': 'completion', 'parsed': 'noitelpmoc'}
# {'llm1': 'completion', 'llm2': 'completion', 'total_chars': 20}

RunnableWithMessageHistory

带对话记忆历史通过session区分不同的人

from typing import List
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from pydantic import BaseModel, Field
from langchain_core.runnables import (
    ConfigurableFieldSpec,
)
from langchain_core.runnables.history import RunnableWithMessageHistory

from langchain_qingchen.chat.QingchenModel import QingchenModel


class InMemoryHistory(BaseChatMessageHistory, BaseModel):
    """In memory implementation of chat message history."""

    messages: List[BaseMessage] = Field(default_factory=list)

    def add_messages(self, messages: List[BaseMessage]) -> None:
        """Add a list of messages to the store"""
        self.messages.extend(messages)

    def clear(self) -> None:
        self.messages = []


# Here we use a global variable to store the chat message history.
# This will make it easier to inspect it to see the underlying results.
store = {}


def get_by_session_id(session_id: str) -> BaseChatMessageHistory:
    if session_id not in store:
        store[session_id] = InMemoryHistory()
    return store[session_id]


history = get_by_session_id("1")
history.add_message(AIMessage(content="hello"))
print(store)  # noqa: T201

prompt = ChatPromptTemplate.from_messages([
    ("system", "You're an assistant who's good at {ability}"),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{question}"),
])

chain = prompt | QingchenModel()

chain_with_history = RunnableWithMessageHistory(
    chain,
    get_by_session_id,
    input_messages_key="question",
    history_messages_key="history",
)

print(chain_with_history.invoke(  # noqa: T201
    {"ability": "math", "question": "What does cosine mean?"},
    config={"configurable": {"session_id": "foo"}}
))

# Uses the store defined in the example above.
print(store)  # noqa: T201

print(chain_with_history.invoke(  # noqa: T201
    {"ability": "math", "question": "What's its inverse"},
    config={"configurable": {"session_id": "foo"}}
))

print(store)  # noqa: T201


store = {}


def get_session_history(
        user_id: str, conversation_id: str
) -> BaseChatMessageHistory:
    if (user_id, conversation_id) not in store:
        store[(user_id, conversation_id)] = InMemoryHistory()
    return store[(user_id, conversation_id)]


prompt = ChatPromptTemplate.from_messages([
    ("system", "You're an assistant who's good at {ability}"),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{question}"),
])

chain = prompt | QingchenModel()

with_message_history = RunnableWithMessageHistory(
    chain,
    get_session_history=get_session_history,
    input_messages_key="question",
    history_messages_key="history",
    history_factory_config=[
        ConfigurableFieldSpec(
            id="user_id",
            annotation=str,
            name="User ID",
            description="Unique identifier for the user.",
            default="",
            is_shared=True,
        ),
        ConfigurableFieldSpec(
            id="conversation_id",
            annotation=str,
            name="Conversation ID",
            description="Unique identifier for the conversation.",
            default="",
            is_shared=True,
        ),
    ],
)

with_message_history.invoke(
    {"ability": "math", "question": "What does cosine mean?"},
    config={"configurable": {"user_id": "123", "conversation_id": "1"}}
)

FakeModel

待补充

伪造一个大模型返回来做测试

def fake_model(input) -> Iterator[ChatGenerationChunk]:
    print("into fake_model")
    for c in content:
        yield AIMessageChunk(c)
        
full_chain = ((RunnableLambda(fake_model) | JsonOutputParser()).with_retry(stop_after_attempt=3)
              | RunnablePassthrough.assign(
            test_summary=itemgetter("input") | RunnableLambda(lambda x: x)
        ))

CustomJsonOutputParser

JsonOutputParser实际使用中有一个问题如果json格式有问题直接就返回none

我们可以继承然后改一下

import random
from json import JSONDecodeError
from typing import Iterator, Optional, Callable, Any
from langchain_core.messages import AIMessageChunk
from langchain_core.outputs import ChatGenerationChunk, Generation
from langchain_core.runnables import RunnableLambda
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.utils.json import parse_json_markdown

content1 = """{\n    "target": "进行简单的英语对话交流",\n    "roles": [\n        "Bulbasaur",\n        "Pikachu"\n    ],\n    "link": [\n        "环节1:Bulbasaur:‘Hello, I am Bulbasaur. What\'s your name?’  Pikachu:‘Hello. My name is Pikachu.’",\n        "环节2:Bulbasaur:‘Nice to meet you, Pikachu.’  Pikachu:‘Nice to meet you too.’",\n        "环节3:Bulbasaur:‘I\'m exploring this area. What about you?’  Pikachu:‘What are you doing?’",\n        "结束环节:Bulbasaur:‘Well, see you next time! <结束>’  Pikachu:‘bye’"\n    ],\n    "scene": "In a lush forest, Bulbasaur and Pikachu meet. Bulbasaur is looking around curiously when he sees Pikachu and says:",\n    "actor_line": "Well, see you next time!",\n    "prompt": "Bulbasaur in a forest, looking a bit disappointed, high quality"\n} """

content2 = """how are you?你好吗?"""


def fake_model(input) -> Iterator[ChatGenerationChunk]:
    print("into fake_model")
    # 随机选择一个content
    content = content1 if random.random() > 0.5 else content2
    for c in content:
        yield AIMessageChunk(c)


test_chain = fake_model | JsonOutputParser()
# print(test_chain.invoke("hello"))


# 自定义输出解析器
class CustomJsonOutputParser(JsonOutputParser):
    """
    自定义JSON输出解析器,支持JSON解析和自定义回退逻辑
    """

    def __init__(
            self,
            pydantic_object: Optional[Any] = None,
            fallback_func: Optional[Callable[[str], dict]] = None
    ):
        """
        初始化自定义JSON输出解析器

        :param pydantic_object: 可选的Pydantic模型
        :param fallback_func: 当无法解析JSON时的自定义处理函数
        """
        # 使用父类构造函数
        super().__init__(pydantic_object=pydantic_object)
        # 存储回退函数
        self._fallback_func = fallback_func

    def parse_result(self, result: list[Generation], *, partial: bool = False) -> Any:
        text = result[0].text
        text = text.strip()
        if partial:
            try:
                return parse_json_markdown(text)
            except JSONDecodeError:
                # 不是json格式返回时
                return self._fallback_func(text)
        else:
            try:
                return parse_json_markdown(text)
            except JSONDecodeError as e:
                # msg = f"Invalid json output: {text}"
                return self._fallback_func(text)


# 自定义回退函数示例
def custom_fallback(text: str) -> dict:
    """
    自定义回退处理函数

    :param text: 原始文本
    :return: 处理后的字典
    """
    return {
        "original_text": text,
        "processed": text.upper(),
        "length": len(text),
        "parse_status": "custom_fallback"
    }


# 创建解析器实例
parser = CustomJsonOutputParser(fallback_func=custom_fallback)
chain = RunnableLambda(fake_model) | parser

# for e in chain.stream("用json格式输出:{'a':1}"):
#     # print(type(e))
#     print(e)

print(chain.invoke("用json格式输出:{'a':1}"))

待补充

Langserve

通过fastapi加上langserve可以快速搭建工作流

#!/usr/bin/env python
from fastapi import FastAPI
from langchain.prompts import ChatPromptTemplate
from langserve import add_routes, RemoteRunnable
from pydantic import BaseModel

from langchain_qingchen.chat.QingchenModel import QingchenModel


class InputClass(BaseModel):
    userinfo: dict


app = FastAPI(
    title="LangChain Server",
    version="1.0",
    description="A simple api server using Langchain's Runnable interfaces",
)

add_routes(
    app,
    QingchenModel(),
    path="/qingchen",
)

add_routes(
    app,
    RemoteRunnable("http://192.168.1.9:8000/joke/").with_types(input_type=InputClass),
    path="/new/joke"
)

model = QingchenModel()
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
add_routes(
    app,
    prompt | model,
    path="/joke",
)

if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="localhost", port=8000)

待补充

Langgraph

快速搭建

# @Author  : ljl
# @Time    : 2025/1/25 下午3:30
# Import relevant functionality
from datetime import datetime

from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent

from langchain_qingchen.chat.QingchenModel import QingchenModel

# Create the agent
memory = MemorySaver()
model = QingchenModel()


@tool
def get_time():
    """
    获取当前时间
    :return: 当前时间
    """
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")


@tool
def get_location():
    """
    获取用户当前位置
    :return: 用户当前位置
    """
    return '南京市浦口区'


tools = [get_time, get_location]
agent_executor = create_react_agent(model, tools, checkpointer=memory)

# Use the agent
config = {"configurable": {"thread_id": "abc123"}}
for chunk in agent_executor.stream(
        {"messages": [HumanMessage(content="我是清尘,我现在在哪?")]}, config
):
    print(chunk)
    print("----")

for chunk in agent_executor.stream(
        {"messages": [HumanMessage(content="南京现在几点了")]}, config
):
    print(chunk)
    print("----")


# {'agent': {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': '01987b44c25a51af954a59a546322a55', 'function': {'arguments': '{}', 'name': 'get_location'}, 'type': 'function', 'index': 0}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 2, 'prompt_tokens': 131, 'total_tokens': 133, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'deepseek-ai/DeepSeek-V3', 'system_fingerprint': '', 'id': '01987b44bcbf607a03000fd17ff8b722', 'service_tier': None, 'finish_reason': 'tool_calls', 'logprobs': None}, id='run--1e85b1da-2476-4034-9258-c05af6e8e796-0', tool_calls=[{'name': 'get_location', 'args': {}, 'id': '01987b44c25a51af954a59a546322a55', 'type': 'tool_call'}], usage_metadata={'input_tokens': 131, 'output_tokens': 2, 'total_tokens': 133, 'input_token_details': {}, 'output_token_details': {}})]}}
# ----
# {'tools': {'messages': [ToolMessage(content='南京市浦口区', name='get_location', id='499460a1-383e-4df2-a6e8-33a73af37bc9', tool_call_id='01987b44c25a51af954a59a546322a55')]}}
# ----
# {'agent': {'messages': [AIMessage(content='清尘,你现在的位置是在南京市浦口区。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 11, 'prompt_tokens': 156, 'total_tokens': 167, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'deepseek-ai/DeepSeek-V3', 'system_fingerprint': '', 'id': '01987b44c4766cd3fe401958ff480d4b', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--74a43175-bca4-4b10-81f3-14d9fdf6b2dd-0', usage_metadata={'input_tokens': 156, 'output_tokens': 11, 'total_tokens': 167, 'input_token_details': {}, 'output_token_details': {}})]}}
# ----
# {'agent': {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': '01987b44d9c94236ea31c9ab626358b5', 'function': {'arguments': '{}', 'name': 'get_time'}, 'type': 'function', 'index': 0}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 2, 'prompt_tokens': 174, 'total_tokens': 176, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'deepseek-ai/DeepSeek-V3', 'system_fingerprint': '', 'id': '01987b44cfca41732375fcabbdd2e0ff', 'service_tier': None, 'finish_reason': 'tool_calls', 'logprobs': None}, id='run--5a140a15-728c-4c7d-a576-1546c126f236-0', tool_calls=[{'name': 'get_time', 'args': {}, 'id': '01987b44d9c94236ea31c9ab626358b5', 'type': 'tool_call'}], usage_metadata={'input_tokens': 174, 'output_tokens': 2, 'total_tokens': 176, 'input_token_details': {}, 'output_token_details': {}})]}}
# ----
# {'tools': {'messages': [ToolMessage(content='2025-08-06 01:26:07', name='get_time', id='a4b538c2-780c-401c-89e6-2c255e5ac4f8', tool_call_id='01987b44d9c94236ea31c9ab626358b5')]}}
# ----
# {'agent': {'messages': [AIMessage(content='南京现在是2025年8月6日凌晨1点26分。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 207, 'total_tokens': 221, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'deepseek-ai/DeepSeek-V3', 'system_fingerprint': '', 'id': '01987b44dc553d3469e67f211c2c2e9d', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--5a662040-3c10-491f-bd04-a3c203f38b25-0', usage_metadata={'input_tokens': 207, 'output_tokens': 14, 'total_tokens': 221, 'input_token_details': {}, 'output_token_details': {}})]}}
# ----

分步搭建

工具

主要是用convert_to_openai_tool转成对应的格式

# @Time    : 2025/1/25 下午2:12
from datetime import datetime
from langchain_core.tools import tool
from typing import List
from langchain_core.utils.function_calling import convert_to_openai_tool
from utils.bingsearch import run_bingsearch_workflow


# 20250325 优化图片搜索工具
@tool
def image_bing_search(keyword: str, count: int, offset: int) -> List[str]:
    """
    使用bing搜图工具
    :param keyword: 搜图提示词
    :param count: 搜索结果数量
    :param offset: 搜索结果偏移量,用于翻页,默认为0-30的随机整数
    :return: 搜图结果是图片链接列表
    """
    import random
    if not offset:
        offset = random.randint(0, 30)
    return run_bingsearch_workflow(keyword, count, offset)


@tool
def current_time() -> str:
    """
    获取当前时间
    :return: 当前时间
    """
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")


image_bing_search_tool = convert_to_openai_tool(image_bing_search)
current_time_tool = convert_to_openai_tool(current_time)

tools = [image_bing_search, current_time]
TOOLS = [image_bing_search_tool, current_time_tool]

模型

关键在于bind绑定工具

# @Time    : 2025/2/8 上午10:02
from assistant.tools import TOOLS
from langchain_qingchen.chat.QingchenModel import QingchenModel

llm_with_tools = QingchenModel(tags=["show", "tools", "agent"]).bind(tools=TOOLS)

构建图

# @Author  : ljl
# @Time    : 2025/1/25 下午1:48
# 正常对话包含历史记录缓存,包含一个搜图的工具
# langgraph实现
from typing import Annotated
from langchain_core.messages import BaseMessage, ToolMessage, AIMessageChunk, SystemMessage
from langgraph.constants import END
from langgraph.prebuilt import ToolNode, tools_condition
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from assistant.assistant_constants import *
from assistant.model_with_tools import llm_with_tools
from assistant.quick_instructions import quick_instructions_pic, quick_instructions_exercise, quick_instructions_advice, quick_instructions_lesson
from assistant.tools import tools
from utils.mylog import logger


# 入参
class AssistantState(TypedDict):
    messages: Annotated[list, add_messages]
    thread_uuid: Annotated[str, "临时对话线程UUID"]
    quick_instructions: Annotated[bool, "是否是快捷指令"]


# 入口
def entry(state: AssistantState):
    logger.info(f"AI Assistant Entry: {state}")


# 模型
def chatbot(state: AssistantState):
    dialogue: list[BaseMessage] = state["messages"]
    # 添加系统提示词
    dialogue.insert(0, SystemMessage(content=system_prompt))
    return {"messages": [llm_with_tools.invoke(dialogue)]}


def get_last_message(state: AssistantState) -> BaseMessage:
    if messages := state.get("messages", []):
        message: BaseMessage = messages[-1]
    else:
        raise ValueError(f"没有收到用户消息: {state}")
    return message


# 快捷指令
def quick_instructions(state: AssistantState):
    message = get_last_message(state)
    logger.info(f"AI Assistant Quick Instructions: {message}")
    lesson_content = state["lesson_content"]
    if message.content == Q1: 
        return {"messages": [quick_instructions_pic(lesson_content)], "quick_instructions": False}
    elif message.content == Q2: 
        return {"messages": [quick_instructions_exercise(lesson_content)], "quick_instructions": False}
    elif message.content == Q3: 
        return {"messages": [quick_instructions_advice(lesson_content)], "quick_instructions": False}
    elif Q4 in message.content: 
        return {"messages": [quick_instructions_lesson(lesson_content)], "quick_instructions": False}
    else:
        return {"messages": [AIMessageChunk(content="触发了快捷指令,但未找到对应的指令")], "quick_instructions": False}


# 入口路由
def entry_route(state: AssistantState):
    """
    快捷指令判断路由,双重校验
    """
    message = get_last_message(state)

    # 第一层校验
    if state.get("quick_instructions", False):
        # 第二层校验
        if trigger_word in message.content:
            # 处理快捷指令
            return "quick"
    return "chatbot"


# 图
def build_graph():
    # tool
    tool_node = ToolNode(tools=tools)

    # memory
    memory = MemorySaver()

    # build graph
    graph_builder = StateGraph(AssistantState)

    graph_builder.add_node("entry", entry)
    graph_builder.add_node("quick", quick_instructions)
    graph_builder.add_node("chatbot", chatbot)
    graph_builder.add_node("tools", tool_node)

    graph_builder.add_conditional_edges(
        "entry",
        entry_route,
        {"chatbot": "chatbot", "quick": "quick"},
    )

    graph_builder.add_conditional_edges(
        "chatbot",
        tools_condition
    )

    graph_builder.add_edge(START, "entry")
    graph_builder.add_edge("tools", "chatbot")
    graph_builder.add_edge("quick", END)

    agent_graph = graph_builder.compile(checkpointer=memory)
    return agent_graph


graph = build_graph().with_config({"run_name": "ai_assistant"})


# 流式
async def assistant_chat_stream(inputs: AssistantState):
    config = {"configurable": {"thread_id": inputs.get("thread_uuid")}}
    async for message, metadata in graph.astream(inputs, config, stream_mode="messages"):
        message: BaseMessage = message
        if "show" not in metadata.get("tags", []):
            # logger.info(f"AI Assistant Hide Response: {message}")
            continue
        if isinstance(message, ToolMessage):
            logger.info(f"AI Assistant Tool Message: {message}")
            continue
        elif isinstance(message, AIMessageChunk):
            if message.tool_calls:
                logger.info(f"AI Assistant Tool Calls: {message}")
                continue
        yield message

# 展示graph结构
# def show_graph():
#     from PIL import Image
#     from io import BytesIO
#     import matplotlib.pyplot as plt
#     from langchain_core.runnables.graph import MermaidDrawMethod
#
#     def show_img(path) -> None:
#         img = Image.open(path)
#         plt.axis('off')  # 不显示坐标轴
#         plt.imshow(img)  # 将数据显示为图像,即在二维常规光栅上。
#         plt.show()  # 显示图片
#
#     show_img(BytesIO(graph.get_graph(xray=True).draw_mermaid_png(draw_method=MermaidDrawMethod.PYPPETEER)))
#
#
# show_graph()

可以结合langserve快速起一个api

add_routes(
    assistant_router,
    RunnableLambda(assistant_chat_stream).with_types(input_type=AssistantState),
    path="/ai/assistant",
)

待补充