Skip to main content

Custom Chat Model

In this guide, we’ll learn how to create a custom chat model using LangChain abstractions.

Wrapping your LLM with the standard ChatModel interface allow you to use your LLM in existing LangChain programs with minimal code modifications!

As an bonus, your LLM will automatically become a LangChain Runnable and will benefit from some optimizations out of the box (e.g., batch via a threadpool), async support, the astream_events API, etc.

Inputs and outputs

First, we need to talk about messages which are the inputs and outputs of chat models.

Messages

Chat models take messages as inputs and return a message as output.

LangChain has a few built-in message types:

  • SystemMessage: Used for priming AI behavior, usually passed in as the first of a sequence of input messages.
  • HumanMessage: Represents a message from a person interacting with the chat model.
  • AIMessage: Represents a message from the chat model. This can be either text or a request to invoke a tool.
  • FunctionMessage / ToolMessage: Message for passing the results of tool invocation back to the model.
note

ToolMessage and FunctionMessage closely follow OpenAIs function and tool arguments.

This is a rapidly developing field and as more models add function calling capabilities, expect that there will be additions to this schema.

from langchain_core.messages import (
AIMessage,
BaseMessage,
FunctionMessage,
HumanMessage,
SystemMessage,
ToolMessage,
)

Streaming Variant

All the chat messages have a streaming variant that contains Chunk in the name.

from langchain_core.messages import (
AIMessageChunk,
FunctionMessageChunk,
HumanMessageChunk,
SystemMessageChunk,
ToolMessageChunk,
)

These chunks are used when streaming output from chat models, and they all define an additive property!

AIMessageChunk(content="Hello") + AIMessageChunk(content=" World!")
AIMessageChunk(content='Hello World!')

Simple Chat Model

Inherting from SimpleChatModel is great for prototyping!

It won’t allow you to implement all features that you might want out of a chat model, but it’s quick to implement, and if you need more you can transition to BaseChatModel shown below.

Let’s implement a chat model that echoes back the last n characters of the prompt!

You need to implement the following:

  • The method _call - Use to generate a chat result from a prompt.

In addition, you have the option to specify the following:

  • The property _identifying_params - Represent model parameterization for logging purposes.

Optional:

  • _stream - Use to implement streaming.

Base Chat Model

Let’s implement a chat model that echoes back the first n characetrs of the last message in the prompt!

To do so, we will inherit from BaseChatModel and we’ll need to implement the following methods/properties:

In addition, you have the option to specify the following:

To do so inherit from BaseChatModel which is a lower level class and implement the methods:

  • _generate - Use to generate a chat result from a prompt
  • The property _llm_type - Used to uniquely identify the type of the model. Used for logging.

Optional:

  • _stream - Use to implement streaming.
  • _agenerate - Use to implement a native async method.
  • _astream - Use to implement async version of _stream.
  • The property _identifying_params - Represent model parameterization for logging purposes.
caution

Currently, to get async streaming to work (via astream), you must provide an implementation of _astream.

By default if _astream is not provided, then async streaming falls back on _agenerate which does not support token by token streaming.

Implementation

from typing import Any, AsyncIterator, Dict, Iterator, List, Optional

from langchain_core.callbacks import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models import BaseChatModel, SimpleChatModel
from langchain_core.messages import AIMessageChunk, BaseMessage, HumanMessage
from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult
from langchain_core.runnables import run_in_executor


class CustomChatModelAdvanced(BaseChatModel):
"""A custom chat model that echoes the first `n` characters of the input.

When contributing an implementation to LangChain, carefully document
the model including the initialization parameters, include
an example of how to initialize the model and include any relevant
links to the underlying models documentation or API.

Example:

.. code-block:: python

model = CustomChatModel(n=2)
result = model.invoke([HumanMessage(content="hello")])
result = model.batch([[HumanMessage(content="hello")],
[HumanMessage(content="world")]])
"""

n: int
"""The number of characters from the last message of the prompt to be echoed."""

def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
"""Override the _generate method to implement the chat model logic.

This can be a call to an API, a call to a local model, or any other
implementation that generates a response to the input prompt.

Args:
messages: the prompt composed of a list of messages.
stop: a list of strings on which the model should stop generating.
If generation stops due to a stop token, the stop token itself
SHOULD BE INCLUDED as part of the output. This is not enforced
across models right now, but it's a good practice to follow since
it makes it much easier to parse the output of the model
downstream and understand why generation stopped.
run_manager: A run manager with callbacks for the LLM.
"""
last_message = messages[-1]
tokens = last_message.content[: self.n]
message = AIMessage(content=tokens)
generation = ChatGeneration(message=message)
return ChatResult(generations=[generation])

def _stream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
"""Stream the output of the model.

This method should be implemented if the model can generate output
in a streaming fashion. If the model does not support streaming,
do not implement it. In that case streaming requests will be automatically
handled by the _generate method.

Args:
messages: the prompt composed of a list of messages.
stop: a list of strings on which the model should stop generating.
If generation stops due to a stop token, the stop token itself
SHOULD BE INCLUDED as part of the output. This is not enforced
across models right now, but it's a good practice to follow since
it makes it much easier to parse the output of the model
downstream and understand why generation stopped.
run_manager: A run manager with callbacks for the LLM.
"""
last_message = messages[-1]
tokens = last_message.content[: self.n]

for token in tokens:
chunk = ChatGenerationChunk(message=AIMessageChunk(content=token))

if run_manager:
run_manager.on_llm_new_token(token, chunk=chunk)

yield chunk

async def _astream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> AsyncIterator[ChatGenerationChunk]:
"""An async variant of astream.

If not provided, the default behavior is to delegate to the _generate method.

The implementation below instead will delegate to `_stream` and will
kick it off in a separate thread.

If you're able to natively support async, then by all means do so!
"""
result = await run_in_executor(
None,
self._stream,
messages,
stop=stop,
run_manager=run_manager.get_sync() if run_manager else None,
**kwargs,
)
for chunk in result:
yield chunk

@property
def _llm_type(self) -> str:
"""Get the type of language model used by this chat model."""
return "echoing-chat-model-advanced"

@property
def _identifying_params(self) -> Dict[str, Any]:
"""Return a dictionary of identifying parameters."""
return {"n": self.n}
tip

The _astream implementation uses run_in_executor to launch the sync _stream in a separate thread.

You can use this trick if you want to reuse the _stream implementation, but if you’re able to implement code that’s natively async that’s a better solution since that code will run with less overhead.

Let’s test it 🧪

The chat model will implement the standard Runnable interface of LangChain which many of the LangChain abstractions support!

model = CustomChatModelAdvanced(n=3)
model.invoke(
[
HumanMessage(content="hello!"),
AIMessage(content="Hi there human!"),
HumanMessage(content="Meow!"),
]
)
AIMessage(content='Meo')
model.invoke("hello")
AIMessage(content='hel')
model.batch(["hello", "goodbye"])
[AIMessage(content='hel'), AIMessage(content='goo')]
for chunk in model.stream("cat"):
print(chunk.content, end="|")
c|a|t|

Please see the implementation of _astream in the model! If you do not implement it, then no output will stream.!

async for chunk in model.astream("cat"):
print(chunk.content, end="|")
c|a|t|

Let’s try to use the astream events API which will also help double check that all the callbacks were implemented!

async for event in model.astream_events("cat", version="v1"):
print(event)
{'event': 'on_chat_model_start', 'run_id': 'e03c0b21-521f-4cb4-a837-02fed65cf1cf', 'name': 'CustomChatModelAdvanced', 'tags': [], 'metadata': {}, 'data': {'input': 'cat'}}
{'event': 'on_chat_model_stream', 'run_id': 'e03c0b21-521f-4cb4-a837-02fed65cf1cf', 'tags': [], 'metadata': {}, 'name': 'CustomChatModelAdvanced', 'data': {'chunk': AIMessageChunk(content='c')}}
{'event': 'on_chat_model_stream', 'run_id': 'e03c0b21-521f-4cb4-a837-02fed65cf1cf', 'tags': [], 'metadata': {}, 'name': 'CustomChatModelAdvanced', 'data': {'chunk': AIMessageChunk(content='a')}}
{'event': 'on_chat_model_stream', 'run_id': 'e03c0b21-521f-4cb4-a837-02fed65cf1cf', 'tags': [], 'metadata': {}, 'name': 'CustomChatModelAdvanced', 'data': {'chunk': AIMessageChunk(content='t')}}
{'event': 'on_chat_model_end', 'name': 'CustomChatModelAdvanced', 'run_id': 'e03c0b21-521f-4cb4-a837-02fed65cf1cf', 'tags': [], 'metadata': {}, 'data': {'output': AIMessageChunk(content='cat')}}
/home/eugene/src/langchain/libs/core/langchain_core/_api/beta_decorator.py:86: LangChainBetaWarning: This API is in beta and may change in the future.
warn_beta(

Identifying Params

LangChain has a callback system which allows implementing loggers to monitor the behavior of LLM applications.

Remember the _identifying_params property from earlier?

It’s passed to the callback system and is accessible for user specified loggers.

Below we’ll implement a handler with just a single on_chat_model_start event to see where _identifying_params appears.

from typing import Union
from uuid import UUID

from langchain_core.callbacks import AsyncCallbackHandler
from langchain_core.outputs import (
ChatGenerationChunk,
ChatResult,
GenerationChunk,
LLMResult,
)


class SampleCallbackHandler(AsyncCallbackHandler):
"""Async callback handler that handles callbacks from LangChain."""

async def on_chat_model_start(
self,
serialized: Dict[str, Any],
messages: List[List[BaseMessage]],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""Run when a chat model starts running."""
print("---")
print("On chat model start.")
print(kwargs)


model.invoke("meow", stop=["woof"], config={"callbacks": [SampleCallbackHandler()]})
---
On chat model start.
{'invocation_params': {'n': 3, '_type': 'echoing-chat-model-advanced', 'stop': ['woof']}, 'options': {'stop': ['woof']}, 'name': None, 'batch_size': 1}
AIMessage(content='meo')

Contributing

We appreciate all chat model integration contributions.

Here’s a checklist to help make sure your contribution gets added to LangChain:

Documentation:

  • The model contains doc-strings for all initialization arguments, as these will be surfaced in the APIReference.
  • The class doc-string for the model contains a link to the model API if the model is powered by a service.

Tests:

  • ☐ Add unit or integration tests to the overridden methods. Verify that invoke, ainvoke, batch, stream work if you’ve over-ridden the corresponding code.

Streaming (if you’re implementing it):

  • ☐ Provided an async implementation via _astream
  • ☐ Make sure to invoke the on_llm_new_token callback
  • on_llm_new_token is invoked BEFORE yielding the chunk

Stop Token Behavior:

  • ☐ Stop token should be respected
  • ☐ Stop token should be INCLUDED as part of the response

Secret API Keys:

  • ☐ If your model connects to an API it will likely accept API keys as part of its initialization. Use Pydantic’s SecretStr type for secrets, so they don’t get accidentally printed out when folks print the model.

Help us out by providing feedback on this documentation page: