Build RAG with Milvus and Gemini
The Gemini API and Google AI Studio help you start working with Google’s latest models and turn your ideas into applications that scale. Gemini provides access to powerful language models like Gemini-1.5-Flash
, Gemini-1.5-Flash-8B
, and Gemini-1.5-Pro
for tasks such as text generation, document processing, vision, audio analysis, and more. The API allows you to input long context with millions of tokens, fine-tune models for specific tasks, generate structured outputs like JSON, and leverage capabilities like semantic retrieval and code execution.
In this tutorial, we will show you how to build a RAG (Retrieval-Augmented Generation) pipeline with Milvus and Gemini. We will use the Gemini model to generate text based on a given query. We will also use Milvus to store and retrieve the generated text.
Preparation
Dependencies and Environment
$ pip install --upgrade pymilvus google-generativeai requests tqdm
If you are using Google Colab, to enable dependencies just installed, you may need to restart the runtime (click on the “Runtime” menu at the top of the screen, and select “Restart session” from the dropdown menu).
You should first log in to the Google AI Studio platform and prepare the api key GEMINI_API_KEY
as an environment variable.
import os
os.environ["GEMINI_API_KEY"] = "***********"
Prepare the data
We use the FAQ pages from the Milvus Documentation 2.4.x as the private knowledge in our RAG, which is a good data source for a simple RAG pipeline.
Download the zip file and extract documents to the folder milvus_docs
.
$ wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
$ unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
We load all markdown files from the folder milvus_docs/en/faq
. For each document, we just simply use "# " to separate the content in the file, which can roughly separate the content of each main part of the markdown file.
from glob import glob
text_lines = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()
text_lines += file_text.split("# ")
Prepare the LLM and Embedding Model
We use the gemini-1.5-flash
as LLM, and the text-embedding-004
as embedding model.
Let’s try to generate a test response from the LLM:
import google.generativeai as genai
genai.configure(api_key=os.environ["GEMINI_API_KEY"])
gemini_model = genai.GenerativeModel("gemini-1.5-flash")
response = gemini_model.generate_content("who are you")
print(response.text)
I am a large language model, trained by Google. I am an AI and don't have a personal identity or consciousness. My purpose is to process information and respond to a wide range of prompts and questions in a helpful and informative way.
Generate a test embedding and print its dimension and first few elements.
test_embeddings = genai.embed_content(
model="models/text-embedding-004", content=["This is a test1", "This is a test2"]
)["embedding"]
embedding_dim = len(test_embeddings[0])
print(embedding_dim)
print(test_embeddings[0][:10])
768
[0.013588584, -0.004361838, -0.08481652, -0.039724775, 0.04723794, -0.0051557426, 0.026071774, 0.045514572, -0.016867816, 0.039378334]
Load data into Milvus
Create the Collection
from pymilvus import MilvusClient
milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"
As for the argument of MilvusClient
:
- Setting the
uri
as a local file, e.g../milvus.db
, is the most convenient method, as it automatically utilizes Milvus Lite to store all data in this file. - If you have large scale of data, you can set up a more performant Milvus server on docker or kubernetes. In this setup, please use the server uri, e.g.
http://localhost:19530
, as youruri
. - If you want to use Zilliz Cloud, the fully managed cloud service for Milvus, adjust the
uri
andtoken
, which correspond to the Public Endpoint and Api key in Zilliz Cloud.
Check if the collection already exists and drop it if it does.
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Create a new collection with specified parameters.
If we don’t specify any field information, Milvus will automatically create a default id
field for primary key, and a vector
field to store the vector data. A reserved JSON field is used to store non-schema-defined fields and their values.
milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # Inner product distance
consistency_level="Strong", # Strong consistency level
)
Insert data
Iterate through the text lines, create embeddings, and then insert the data into Milvus.
Here is a new field text
, which is a non-defined field in the collection schema. It will be automatically added to the reserved JSON dynamic field, which can be treated as a normal field at a high level.
from tqdm import tqdm
data = []
doc_embeddings = genai.embed_content(
model="models/text-embedding-004", content=text_lines
)["embedding"]
for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": doc_embeddings[i], "text": line})
milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 100%|██████████| 72/72 [00:00<00:00, 468201.38it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}
Build RAG
Retrieve data for a query
Let’s specify a frequent question about Milvus.
question = "How is data stored in milvus?"
Search for the question in the collection and retrieve the semantic top-3 matches.
question_embedding = genai.embed_content(
model="models/text-embedding-004", content=question
)["embedding"]
search_res = milvus_client.search(
collection_name=collection_name,
data=[question_embedding],
limit=3, # Return top 3 results
search_params={"metric_type": "IP", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
Let’s take a look at the search results of the query
import json
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
0.8048275113105774
],
[
"Does the query perform in memory? What are incremental data and historical data?\n\nYes. When a query request comes, Milvus searches both incremental data and historical data by loading them into memory. Incremental data are in the growing segments, which are buffered in memory before they reach the threshold to be persisted in storage engine, while historical data are from the sealed segments that are stored in the object storage. Incremental data and historical data together constitute the whole dataset to search.\n\n###",
0.7574886679649353
],
[
"What is the maximum dataset size Milvus can handle?\n\n \nTheoretically, the maximum dataset size Milvus can handle is determined by the hardware it is run on, specifically system memory and storage:\n\n- Milvus loads all specified collections and partitions into memory before running queries. Therefore, memory size determines the maximum amount of data Milvus can query.\n- When new entities and and collection-related schema (currently only MinIO is supported for data persistence) are added to Milvus, system storage determines the maximum allowable size of inserted data.\n\n###",
0.7453608512878418
]
]
Use LLM to get a RAG response
Convert the retrieved documents into a string format.
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
Define system and user prompts for the Lanage Model. This prompt is assembled with the retrieved documents from Milvus.
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""
Use the Gemini to generate a response based on the prompts.
gemini_model = genai.GenerativeModel(
"gemini-1.5-flash", system_instruction=SYSTEM_PROMPT
)
response = gemini_model.generate_content(USER_PROMPT)
print(response.text)
Milvus stores data in two ways: Inserted data (vector data, scalar data, and collection-specific schema) is stored as an incremental log in persistent storage using object storage backends such as MinIO, AWS S3, Google Cloud Storage, Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage. Metadata, generated by each Milvus module, is stored in etcd.
Great! We have successfully built a RAG pipeline with Milvus and Gemini.