In generative AI applications, often the performance bottleneck is due to the long latency of LLM execution, thus, it’s crucial to combat this issue. In this article, I will share with you how we leverage Haystack and Hamilton to make AI pipelines fully asynchronous and can support 1500+ concurrent users!
Wren AI is a text-to-SQL solution for data teams to get results and insights faster by asking business questions without writing SQL. It’s open-source! Wren AI LLM Service is responsible for LLM-related tasks like converting natural language questions into SQL queries and providing step-by-step SQL breakdowns.
Notes: The tasks described above are the main tasks for Wren AI LLM Service as of now, and there are several other potential new functionalities in our backlog or progress, such as text-to-chart, recommend follow-up questions, recommend new questions after the user is on-boarded, feedback loop, clarification, llm system evaluation framework, etc. Welcome to discuss with us and share your thoughts, on our Discord Server!
The first version of Wren AI LLM Service did not utilize any large language model (LLM) frameworks. Instead, it relied on:
As a result, the first version is a naive RAG for the text-to-SQL task.
The Challenge
After the first version was done, we understood that there are some challenges if we would like to support more complex AI pipelines:
After one week’s survey, we decided to choose Haystack as our infrastructure to build generative AI applications. We started to use it during its 2.0 beta version. There are several reasons why we chose Haystack, listed as follows:
As a result, we can easily develop more complex AI pipelines and integrate other LLMs and Document Stores right away.
The Challenge
You may think Haystack is a perfect choice for building generative AI applications; unfortunately, there is no perfect software in the world. There is one challenge for our use case: we need to consider the performance of the application in terms of user’s requests per second(a.k.a throughput) and time spent per user’s request(a.k.a latency).
Since Haystack doesn’t have built-in async support, the pipeline computation is synchronous by default(The bottleneck of the generative AI applications is usually rooted in the long latency of LLM execution). So, we need to make our AI pipelines async to combat the issue. (This issue is one of the popular raised topics, and there is a slide shared by the Haystack team introducing several proposals to combat the issue)
We thought a candidate solution should have at least these criteria:
We found a library called Hamilton. Hamilton is a lightweight Python library that can help us separate pipeline computation logic from the computation runtime environment.
Quoted from Hamilton’s GitHub page:
Hamilton is a lightweight Python library for directed acyclic graphs (DAGs) of data transformations. Your DAG is portable; it runs anywhere Python runs, whether it’s a script, notebook, Airflow pipeline, FastAPI server, etc. Your DAG is expressive; Hamilton has extensive features to define and modify the execution of a DAG (e.g., data validation, experiment tracking, remote execution).
To create a DAG, write regular Python functions that specify their dependencies with their parameters. As shown below, it results in readable code that can always be visualized. Hamilton loads that definition and automatically builds the DAG for you!
You can see the differences between using Haystack alone and Haystack integrated with Hamilton with code snippets provided below:
Code Snippets Before rewriting the AI pipeline using Hamilton
__init__
method of the Generation
class with pipeline’s built-in add_component
and connect
methods.run
method to invoke the pipeline execution.class Generation(BasicPipeline):
def __init__(
self,
llm_provider: LLMProvider,
):
self._pipeline = Pipeline()
self._pipeline.add_component(
"text_to_sql_prompt_builder",
PromptBuilder(template=text_to_sql_user_prompt_template),
)
self._pipeline.add_component(
"text_to_sql_generator",
llm_provider.get_generator(system_prompt=text_to_sql_system_prompt),
)
self._pipeline.add_component("post_processor", init_generation_post_processor())
self._pipeline.connect(
"text_to_sql_prompt_builder.prompt", "text_to_sql_generator.prompt"
)
self._pipeline.connect(
"text_to_sql_generator.replies", "post_processor.replies"
)
super().__init__(self._pipeline)
@timer
def run(
self,
query: str,
contexts: List[Document],
exclude: List[Dict],
include_outputs_from: List[str] | None = None,
):
logger.info("Ask Generation pipeline is running...")
return self._pipeline.run(
{
"text_to_sql_prompt_builder": {
"query": query,
"documents": contexts,
"alert": TEXT_TO_SQL_RULES,
"exclude": exclude,
}
},
include_outputs_from=(
set(include_outputs_from) if include_outputs_from else None
),
)
Code snippets after rewriting the AI pipeline using Hamilton
__init__
method of the Generation
class; however, we declare how the components should stick together by describing the connections between components using functions.run
method for invoking pipeline execution, but under the hood, it’s Hamilton’s AsyncDriver
.## Start of Pipeline
@timer
def prompt(
query: str,
documents: List[Document],
exclude: List[Dict],
alert: str,
prompt_builder: PromptBuilder,
) -> dict:
logger.debug(f"query: {query}")
logger.debug(f"documents: {documents}")
logger.debug(f"exclude: {exclude}")
return prompt_builder.run(
query=query, documents=documents, exclude=exclude, alert=alert
)
@async_timer
async def generate(prompt: dict, generator: Any) -> dict:
logger.debug(f"prompt: {prompt}")
return await generator.run(prompt=prompt.get("prompt"))
@async_timer
async def post_process(generate: dict, post_processor: GenerationPostProcessor) -> dict:
logger.debug(f"generate: {generate}")
return await post_processor.run(generate.get("replies"))
## End of Pipeline
class Generation(BasicPipeline):
def __init__(
self,
llm_provider: LLMProvider,
engine: Engine,
):
self.generator = llm_provider.get_generator(
system_prompt=text_to_sql_system_prompt
)
self.prompt_builder = PromptBuilder(template=text_to_sql_user_prompt_template)
self.post_processor = GenerationPostProcessor(engine=engine)
super().__init__(
AsyncDriver({}, sys.modules[__name__], result_builder=base.DictResult())
)
@async_timer
async def run(
self,
query: str,
contexts: List[Document],
exclude: List[Dict],
):
logger.info("Ask Generation pipeline is running...")
return await self._pipe.execute(
["post_process"],
inputs={
"query": query,
"documents": contexts,
"exclude": exclude,
"alert": TEXT_TO_SQL_RULES,
"generator": self.generator,
"prompt_builder": self.prompt_builder,
"post_processor": self.post_processor,
},
)
Now, I am going to show you a series of performance testing results to prove that our architecture rewrite is successful and why we say that our LLM service can handle 1500+ concurrent users?(Wren AI LLM Service is a FastAPI server run with one uvicorn worker)
First of all, the performance improvement is substantial:
Performance statistics before rewriting the AI pipeline using Hamilton
Performance statistics after rewriting the AI pipeline using Hamilton
Also, to make sure our AI pipeline is indeed capable of doing asynchronous computation, I’ve built a “simulated API” that mimics AI pipeline behavior to compare with the real AI pipeline API.
You can easily see the similar performance statistics here:
Number of requests finished
Average requests per second(a.k.a throughput)
Average response time(a.k.a latency)
Simulated API version
Real AI pipeline API version
You can see that they both have similar performance statistics, so we believe implementing the third version architecture is successful!
Finally, I will share with you how we can infer that our LLM service can handle 1500+ concurrent users. Since I’ve shown you that the performance statistics between the simulated API and the real API are similar. Thus I hypothesize that if we can prove the simulated API version can handle 1500+ concurrent users, then I can infer that the real API version can also achieve the same results. (I assume the LLM can handle 1500+ concurrent users here)
You can see that the performance statistics are quite healthy!
Notes: the tests were all done on my MacBook Pro(14-inch, 2023 with Apple M2 Pro, 16GB Memory)
I hope this article can help you understand some issues you may encounter while building generative AI applications. I also hope the tools I mentioned in this article can help you solve issues more easily! These are all great open-source software tools we can leverage and learn from!
Feel free to leave a comment in this post if you have any thoughts you would like to discuss on the topic. Thanks!
Thank you Howard Chi, William Chang, who reviewed the post and provided feedback!
👉 GitHub: https://github.com/Canner/WrenAI
👉 X: https://twitter.com/getwrenai
Supercharge Your Data with AI Today?!