Guide to extending OpenWebUI using Pipelines

OpenWebUI is a self-hosted UI for interacting with various LLM models, both on-device and in the cloud. I use it as my primary method of interacting with LLMs due to its wide variety of models, ability to keep data local with locally deployed models, and extensive features. I've set it up along with Tailscale on my homelab so my family can access it with their own logins, maintaining their conversation history and settings.

Recently, I stumbled upon Storm from Stanford, a tool that uses LLMs and Search to generate long Wiki-like articles. It's useful for my personal workflows by providing a jumping point for deeper research. I aimed to bring this functionality to OpenWebUI, so I began exploring Pipelines.

Pipelines serve as the mechanism in OpenWebUI to extend its capabilities. Unfortunately, they are still not well-documented, which led me to dig into the code, scratch my head, and debug to integrate Storm finally. I hope this post serves as an introductory guide for anyone trying to do the same, saving you a few hours of head-scratching.

Valves

Briefly Valves is the mechanism for configuring your pipeline. This is the mechanism for user to change something about your pipeline e.g. if you want to get some API_KEY then that will be a Valve, if you want some value from user which will change your pipeline behavior then that will also be a Valve. The admin can see and update all these values from the OpenWebUI settings.

Pipelines

Following are different type of pipelines you can create

Filter

Filter pipelines allows you to intercept the user request/message before it goes to LLM model and also after the response comes from LLM model but before its sent to users. This is what can allow various scenarios such as

  • RAG to fetch more context and put it into the message to LLM to use.
  • Tools that gets executed and adds any context for LLM
  • Prompt injection filter to catch them before LLM gets to respond
  • Safety filters e.g. using Meta LLamaGuard before user request is answered

How I like to think of this is that if I want to do something before or after LLM is called then I would create a Filter pipeline.

Filter pipeline flow which is "Chat request -> Inlet -> LLM model -> Outlet -> Chat response"

Here is what a filter pipeline would look like

from typing import List, Optional
from pydantic import BaseModel
from schemas import OpenAIChatMessage


class Pipeline:
    class Valves(BaseModel):
        # List target pipeline ids (models) that this filter will be connected to.
        # If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
        pipelines: List[str] = []

        # Assign a priority level to the filter pipeline.
        # The priority level determines the order in which the filter pipelines are executed.
        # The lower the number, the higher the priority.
        priority: int = 0

        # Add your custom parameters/configuration here e.g. API_KEY that you want user to configure etc.
        pass

    def __init__(self):
        self.type = "filter"
        self.name = "Filter"
        self.valves = self.Valves(**{"pipelines": ["*"]})

        pass

    async def on_startup(self):
        # This function is called when the server is started.
        print(f"on_startup:{__name__}")
        pass

    async def on_shutdown(self):
        # This function is called when the server is stopped.
        print(f"on_shutdown:{__name__}")
        pass

    async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
        # This filter is applied to the form data BEFORE it is sent to the LLM API.
        print(f"inlet:{__name__}")

        return body
        
    async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
    	This filter is applied to the form data AFTER it is sent to the LLM API.
        print(f"outlet:{__name__}")

You intercept the messages using the body that is passed in. It contains all the information e.g. messages which contains the message history. You can use utility methods such as get_last_user_message, get_last_assistant_message to get latest messages, do something with them and update the corresponding message content and return back the whole body with the updated messages e.g.

....
from utils.pipelines.main import get_last_user_message, get_last_assistant_message

class Pipeline:
	...
    
    async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
        messages = body.get("messages", [])
        user_message = get_last_user_message(messages)
        
        if user_message is not None:
            # Do something

            for message in reversed(messages):
                if message["role"] == "user":
                    message["content"] = "UPDATED CORRESPONDING CONTENT THAT LLM WILL USE"
                    break

        body = {**body, "messages": messages}
        return body
        
    async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
    	messages = body["messages"]
        assistant_message = get_last_assistant_message(messages)

	if assistant_message is not None:
            # Do something

            for message in reversed(messages):
                if message["role"] == "assistant":
                message["content"] = "UPDATED CORRESPONDING CONTENT THAT USER WILL SEE"
                break

        body = {**body, "messages": messages}
        return body

Images are also passed in as part of message so check for "images" in message to get the Base64 encoded images and you can then use that to do any kind of image processing you want.

Tools

Tools are special type of filters where a particular tool is selected based on its description and what the user has asked for e.g. if user asks math question and you want it to actually calculate instead of just hallucinating the answer then Calculator Tool will be the way to go. To create Tools your Pipeline needs to inherit from FunctionCallingBlueprint as it implements the inlet part of filter to do function calling.

import os
import requests
from typing import Literal, List, Optional
from datetime import datetime


from blueprints.function_calling_blueprint import Pipeline as FunctionCallingBlueprint


class Pipeline(FunctionCallingBlueprint):
    class Valves(FunctionCallingBlueprint.Valves):
        # Add your custom parameters/configuration here e.g. API_KEY that you want user to configure etc.
        pass

    class Tools:
        def __init__(self, pipeline) -> None:
            self.pipeline = pipeline

        def calculator(self, equation: str) -> str:
            """
            Calculate the result of an equation.

            :param equation: The equation to calculate.
            """

            try:
                result = eval(equation)
                return f"{equation} = {result}"
            except Exception as e:
                print(e)
                return "Invalid equation"

    def __init__(self):
        super().__init__()
        self.name = "My Calculator Tool Pipeline"
        self.valves = self.Valves(
            **{
                **self.valves.model_dump(),
                "pipelines": ["*"],  # Connect to all pipelines
            },
        )
        self.tools = self.Tools(self)

In above example if you don't want a function in Tools class to be used then prefix it with __ e.g. __helper_function_to_do_something. If you setup a Valve e.g. to get some configuration from user then you can access it in tool as self.pipeline.valves.CUSTOM_PARAM, though in my experience I was able to access it when function was invoked but not in Tools.__init__ as in that case the value was still None for those Valves.

If you are also wondering that which model is used for figuring out the Tool, then the FunctionCallingBlueprint creates a Valve called TASK_MODEL which is used to figure out which function/tool to call. You can update it to whatever you want from the Settings in OpenWebUI if you don't prefer the default.

Pipe

Pipe/Manifold pipeline flow which is "Chat request ->Pipe -> Chat response"

This is when you want to take over what happens when user uses chat in OpenWebUI. The "pipe" pipeline allows you to integrate new LLM providers, build workflows that takes the user message and respond, complete RAG system that does retrieval and also generation using the LLM you want.

Basically if you want to take over what happens when user sends a request then you will implement the "pipe" function.

class Pipeline:

    class Valves(BaseModel):
        pass

    def __init__(self):
        self.valves = self.Valves()

    async def on_startup(self):
        print(f"on_startup:{__name__}")
        pass

    async def on_shutdown(self):
        print(f"on_shutdown:{__name__}")
        pass

    def pipe(
        self, user_message: str, model_id: str, messages: List[dict], body: dict
    ) -> Union[str, Generator, Iterator]:

        return "HERE IS THE RESPONSE"

In the scenario I was trying to implement, I opted for this as I wanted to integrate the Stanford Storm Wiki which basically takes a topic, makes multiple calls to LLM to research the topic and create outline and finally write Wiki like article on it.

Manifold

Manifold is a special type of "pipe" Pipeline as it allows user to select specific model. Various LLM integration in OpenWebUI such as Anthropic, Groq uses this as they also tell OpenWebUI the list of models. So if you want to implement a new LLM provider then it likely will be a manifold

class Pipeline:
	...
    def __init__(self):
    	self.type = "manifold"
        ...
    
    def pipelines(self) -> List[dict]:
    	return ["model-1", "model-2"]
        
    def pipe(
        self, user_message: str, model_id: str, messages: List[dict], body: dict
    ) -> Union[str, Generator, Iterator]:
    	
        # Here use the `model_id` that user picked
        pass

How to add more PIP dependencies

As the pipeline runs in its own docker which may not have the python package you need. In that case you can specific the requirements in the start of file using the front-matter specification. This can involve description, name, author etc. Along with that you can also specific comma separate "requirements" which will be used to install any new dependencies.

"""
title: Filter Pipeline
author: open-webui
date: 2024-05-30
version: 1.1
license: MIT
description: Example of a filter pipeline that can be used to edit the form data before it is sent to LLM API.
requirements: requests
"""

class Pipeline:
	...
💡
The caveat here is that for this to work, you would have add the pipelines when you start the docker. If you install the pipeline through the OpenWebUI interface then it won't install these additional dependencies.```docker run -d -p 9099:9099 --add-host=host.docker.internal:host-gateway -e PIPELINES_URLS="https://github.com/open-webui/pipelines/blob/main/examples/filters/detoxify_filter_pipeline.py" -v pipelines:/app/pipelines --name pipelines --restart always ghcr.io/open-webui/pipelines:main```Here the pipelines installed through PIPELINES_URLS will parse the front matter and install additional dependencies.

Other Tips

  • If you install the pipeline through OpenWebUI, it won't throw any error if it failed to add the pipeline due to dependency issue or code issue. The way I found to debug that is to get the logs from the running docker
docker logs -f container_name
  • Look at various examples in the pipelines repo to see whats the closest scenario to what you are trying to build. Use that as the starting point and edit those.