Custom Chat Model
In this guide, we’ll learn how to create a custom chat model using LangChain abstractions.
Wrapping your LLM with the standard ChatModel
interface allow you to
use your LLM in existing LangChain programs with minimal code
modifications!
As an bonus, your LLM will automatically become a LangChain Runnable
and will benefit from some optimizations out of the box (e.g., batch via
a threadpool), async support, the astream_events
API, etc.
Inputs and outputs
First, we need to talk about messages which are the inputs and outputs of chat models.
Messages
Chat models take messages as inputs and return a message as output.
LangChain has a few built-in message types:
SystemMessage
: Used for priming AI behavior, usually passed in as the first of a sequence of input messages.HumanMessage
: Represents a message from a person interacting with the chat model.AIMessage
: Represents a message from the chat model. This can be either text or a request to invoke a tool.FunctionMessage
/ToolMessage
: Message for passing the results of tool invocation back to the model.
ToolMessage
and FunctionMessage
closely follow OpenAIs function
and tool
arguments.
This is a rapidly developing field and as more models add function calling capabilities, expect that there will be additions to this schema.
from langchain_core.messages import (
AIMessage,
BaseMessage,
FunctionMessage,
HumanMessage,
SystemMessage,
ToolMessage,
)
Streaming Variant
All the chat messages have a streaming variant that contains Chunk
in
the name.
from langchain_core.messages import (
AIMessageChunk,
FunctionMessageChunk,
HumanMessageChunk,
SystemMessageChunk,
ToolMessageChunk,
)
These chunks are used when streaming output from chat models, and they all define an additive property!
AIMessageChunk(content="Hello") + AIMessageChunk(content=" World!")
AIMessageChunk(content='Hello World!')
Simple Chat Model
Inherting from SimpleChatModel
is great for prototyping!
It won’t allow you to implement all features that you might want out of
a chat model, but it’s quick to implement, and if you need more you can
transition to BaseChatModel
shown below.
Let’s implement a chat model that echoes back the last n
characters of
the prompt!
You need to implement the following:
- The method
_call
- Use to generate a chat result from a prompt.
In addition, you have the option to specify the following:
- The property
_identifying_params
- Represent model parameterization for logging purposes.
Optional:
_stream
- Use to implement streaming.
Base Chat Model
Let’s implement a chat model that echoes back the first n
characetrs
of the last message in the prompt!
To do so, we will inherit from BaseChatModel
and we’ll need to
implement the following methods/properties:
In addition, you have the option to specify the following:
To do so inherit from BaseChatModel
which is a lower level class and
implement the methods:
_generate
- Use to generate a chat result from a prompt- The property
_llm_type
- Used to uniquely identify the type of the model. Used for logging.
Optional:
_stream
- Use to implement streaming._agenerate
- Use to implement a native async method._astream
- Use to implement async version of_stream
.- The property
_identifying_params
- Represent model parameterization for logging purposes.
Currently, to get async streaming to work (via astream
), you must
provide an implementation of _astream
.
By default if _astream
is not provided, then async streaming falls
back on _agenerate
which does not support token by token streaming.
Implementation
from typing import Any, AsyncIterator, Dict, Iterator, List, Optional
from langchain_core.callbacks import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models import BaseChatModel, SimpleChatModel
from langchain_core.messages import AIMessageChunk, BaseMessage, HumanMessage
from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult
from langchain_core.runnables import run_in_executor
class CustomChatModelAdvanced(BaseChatModel):
"""A custom chat model that echoes the first `n` characters of the input.
When contributing an implementation to LangChain, carefully document
the model including the initialization parameters, include
an example of how to initialize the model and include any relevant
links to the underlying models documentation or API.
Example:
.. code-block:: python
model = CustomChatModel(n=2)
result = model.invoke([HumanMessage(content="hello")])
result = model.batch([[HumanMessage(content="hello")],
[HumanMessage(content="world")]])
"""
n: int
"""The number of characters from the last message of the prompt to be echoed."""
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
"""Override the _generate method to implement the chat model logic.
This can be a call to an API, a call to a local model, or any other
implementation that generates a response to the input prompt.
Args:
messages: the prompt composed of a list of messages.
stop: a list of strings on which the model should stop generating.
If generation stops due to a stop token, the stop token itself
SHOULD BE INCLUDED as part of the output. This is not enforced
across models right now, but it's a good practice to follow since
it makes it much easier to parse the output of the model
downstream and understand why generation stopped.
run_manager: A run manager with callbacks for the LLM.
"""
last_message = messages[-1]
tokens = last_message.content[: self.n]
message = AIMessage(content=tokens)
generation = ChatGeneration(message=message)
return ChatResult(generations=[generation])
def _stream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
"""Stream the output of the model.
This method should be implemented if the model can generate output
in a streaming fashion. If the model does not support streaming,
do not implement it. In that case streaming requests will be automatically
handled by the _generate method.
Args:
messages: the prompt composed of a list of messages.
stop: a list of strings on which the model should stop generating.
If generation stops due to a stop token, the stop token itself
SHOULD BE INCLUDED as part of the output. This is not enforced
across models right now, but it's a good practice to follow since
it makes it much easier to parse the output of the model
downstream and understand why generation stopped.
run_manager: A run manager with callbacks for the LLM.
"""
last_message = messages[-1]
tokens = last_message.content[: self.n]
for token in tokens:
chunk = ChatGenerationChunk(message=AIMessageChunk(content=token))
if run_manager:
run_manager.on_llm_new_token(token, chunk=chunk)
yield chunk
async def _astream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> AsyncIterator[ChatGenerationChunk]:
"""An async variant of astream.
If not provided, the default behavior is to delegate to the _generate method.
The implementation below instead will delegate to `_stream` and will
kick it off in a separate thread.
If you're able to natively support async, then by all means do so!
"""
result = await run_in_executor(
None,
self._stream,
messages,
stop=stop,
run_manager=run_manager.get_sync() if run_manager else None,
**kwargs,
)
for chunk in result:
yield chunk
@property
def _llm_type(self) -> str:
"""Get the type of language model used by this chat model."""
return "echoing-chat-model-advanced"
@property
def _identifying_params(self) -> Dict[str, Any]:
"""Return a dictionary of identifying parameters."""
return {"n": self.n}
The _astream
implementation uses run_in_executor
to launch the sync
_stream
in a separate thread.
You can use this trick if you want to reuse the _stream
implementation, but if you’re able to implement code that’s natively
async that’s a better solution since that code will run with less
overhead.
Let’s test it 🧪
The chat model will implement the standard Runnable
interface of
LangChain which many of the LangChain abstractions support!
model = CustomChatModelAdvanced(n=3)
model.invoke(
[
HumanMessage(content="hello!"),
AIMessage(content="Hi there human!"),
HumanMessage(content="Meow!"),
]
)
AIMessage(content='Meo')
model.invoke("hello")
AIMessage(content='hel')
model.batch(["hello", "goodbye"])
[AIMessage(content='hel'), AIMessage(content='goo')]
for chunk in model.stream("cat"):
print(chunk.content, end="|")
c|a|t|
Please see the implementation of _astream
in the model! If you do not
implement it, then no output will stream.!
async for chunk in model.astream("cat"):
print(chunk.content, end="|")
c|a|t|
Let’s try to use the astream events API which will also help double check that all the callbacks were implemented!
async for event in model.astream_events("cat", version="v1"):
print(event)
{'event': 'on_chat_model_start', 'run_id': 'e03c0b21-521f-4cb4-a837-02fed65cf1cf', 'name': 'CustomChatModelAdvanced', 'tags': [], 'metadata': {}, 'data': {'input': 'cat'}}
{'event': 'on_chat_model_stream', 'run_id': 'e03c0b21-521f-4cb4-a837-02fed65cf1cf', 'tags': [], 'metadata': {}, 'name': 'CustomChatModelAdvanced', 'data': {'chunk': AIMessageChunk(content='c')}}
{'event': 'on_chat_model_stream', 'run_id': 'e03c0b21-521f-4cb4-a837-02fed65cf1cf', 'tags': [], 'metadata': {}, 'name': 'CustomChatModelAdvanced', 'data': {'chunk': AIMessageChunk(content='a')}}
{'event': 'on_chat_model_stream', 'run_id': 'e03c0b21-521f-4cb4-a837-02fed65cf1cf', 'tags': [], 'metadata': {}, 'name': 'CustomChatModelAdvanced', 'data': {'chunk': AIMessageChunk(content='t')}}
{'event': 'on_chat_model_end', 'name': 'CustomChatModelAdvanced', 'run_id': 'e03c0b21-521f-4cb4-a837-02fed65cf1cf', 'tags': [], 'metadata': {}, 'data': {'output': AIMessageChunk(content='cat')}}
/home/eugene/src/langchain/libs/core/langchain_core/_api/beta_decorator.py:86: LangChainBetaWarning: This API is in beta and may change in the future.
warn_beta(
Identifying Params
LangChain has a callback system which allows implementing loggers to monitor the behavior of LLM applications.
Remember the _identifying_params
property from earlier?
It’s passed to the callback system and is accessible for user specified loggers.
Below we’ll implement a handler with just a single on_chat_model_start
event to see where _identifying_params
appears.
from typing import Union
from uuid import UUID
from langchain_core.callbacks import AsyncCallbackHandler
from langchain_core.outputs import (
ChatGenerationChunk,
ChatResult,
GenerationChunk,
LLMResult,
)
class SampleCallbackHandler(AsyncCallbackHandler):
"""Async callback handler that handles callbacks from LangChain."""
async def on_chat_model_start(
self,
serialized: Dict[str, Any],
messages: List[List[BaseMessage]],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""Run when a chat model starts running."""
print("---")
print("On chat model start.")
print(kwargs)
model.invoke("meow", stop=["woof"], config={"callbacks": [SampleCallbackHandler()]})
---
On chat model start.
{'invocation_params': {'n': 3, '_type': 'echoing-chat-model-advanced', 'stop': ['woof']}, 'options': {'stop': ['woof']}, 'name': None, 'batch_size': 1}
AIMessage(content='meo')
Contributing
We appreciate all chat model integration contributions.
Here’s a checklist to help make sure your contribution gets added to LangChain:
Documentation:
- The model contains doc-strings for all initialization arguments, as these will be surfaced in the APIReference.
- The class doc-string for the model contains a link to the model API if the model is powered by a service.
Tests:
- ☐ Add unit or integration tests to the overridden methods. Verify
that
invoke
,ainvoke
,batch
,stream
work if you’ve over-ridden the corresponding code.
Streaming (if you’re implementing it):
- ☐ Provided an async implementation via
_astream
- ☐ Make sure to invoke the
on_llm_new_token
callback - ☐
on_llm_new_token
is invoked BEFORE yielding the chunk
Stop Token Behavior:
- ☐ Stop token should be respected
- ☐ Stop token should be INCLUDED as part of the response
Secret API Keys:
- ☐ If your model connects to an API it will likely accept API keys as
part of its initialization. Use Pydantic’s
SecretStr
type for secrets, so they don’t get accidentally printed out when folks print the model.