Crie RAG com Milvus e DeepSeek
O DeepSeek permite que os desenvolvedores criem e dimensionem aplicativos de IA com modelos de linguagem de alto desempenho. Ele oferece inferência eficiente, APIs flexíveis e arquiteturas avançadas de Mixture-of-Experts (MoE) para tarefas robustas de raciocínio e recuperação.
Neste tutorial, mostraremos como criar um pipeline RAG (Retrieval-Augmented Generation) usando Milvus e DeepSeek.
Preparação
Dependências e ambiente
! pip install --upgrade pymilvus[model] openai requests tqdm
Se você estiver usando o Google Colab, para habilitar as dependências recém-instaladas, talvez seja necessário reiniciar o tempo de execução (clique no menu "Tempo de execução" na parte superior da tela e selecione "Reiniciar sessão" no menu suspenso).
O DeepSeek ativa a API de estilo OpenAI. Pode iniciar sessão no seu site oficial e preparar a chave api DEEPSEEK_API_KEY
como uma variável de ambiente.
import os
os.environ["DEEPSEEK_API_KEY"] = "***********"
Preparar os dados
Usamos as páginas de FAQ da Documentação do Milvus 2.4.x como o conhecimento privado em nosso RAG, que é uma boa fonte de dados para um pipeline RAG simples.
Descarregue o ficheiro zip e extraia os documentos para a pasta milvus_docs
.
! wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
! unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
Carregamos todos os ficheiros markdown da pasta milvus_docs/en/faq
. Para cada documento, utilizamos simplesmente "#" para separar o conteúdo do ficheiro, o que permite separar aproximadamente o conteúdo de cada parte principal do ficheiro markdown.
from glob import glob
text_lines = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()
text_lines += file_text.split("# ")
Preparar o LLM e o modelo de incorporação
O DeepSeek habilita a API no estilo OpenAI, e você pode usar a mesma API com pequenos ajustes para chamar o LLM.
from openai import OpenAI
deepseek_client = OpenAI(
api_key=os.environ["DEEPSEEK_API_KEY"],
base_url="https://api.deepseek.com",
)
Defina um modelo de incorporação para gerar incorporação de texto usando o milvus_model
. Usamos o modelo DefaultEmbeddingFunction
como exemplo, que é um modelo de incorporação pré-treinado e leve.
from pymilvus import model as milvus_model
embedding_model = milvus_model.DefaultEmbeddingFunction()
Gerar um embedding de teste e imprimir a sua dimensão e os primeiros elementos.
test_embedding = embedding_model.encode_queries(["This is a test"])[0]
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
768
[-0.04836066 0.07163023 -0.01130064 -0.03789345 -0.03320649 -0.01318448
-0.03041712 -0.02269499 -0.02317863 -0.00426028]
Carregar dados no Milvus
Criar a coleção
from pymilvus import MilvusClient
milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"
Quanto ao argumento de
MilvusClient
:
- Definir o
uri
como um ficheiro local, por exemplo,./milvus.db
, é o método mais conveniente, uma vez que utiliza automaticamente o Milvus Lite para armazenar todos os dados neste ficheiro.- Se tiver uma grande escala de dados, pode configurar um servidor Milvus mais eficiente em docker ou kubernetes. Nesta configuração, utilize o uri do servidor, por exemplo,
http://localhost:19530
, como o seuuri
.- Se pretender utilizar o Zilliz Cloud, o serviço de nuvem totalmente gerido para o Milvus, ajuste os endereços
uri
etoken
, que correspondem ao Public Endpoint e à chave Api no Zilliz Cloud.
Verificar se a coleção já existe e eliminá-la se existir.
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Criar uma nova coleção com os parâmetros especificados.
Se não especificarmos qualquer informação de campo, o Milvus criará automaticamente um campo id
por defeito para a chave primária e um campo vector
para armazenar os dados vectoriais. Um campo JSON reservado é utilizado para armazenar campos não definidos pelo esquema e os respectivos valores.
milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # Inner product distance
consistency_level="Strong", # Strong consistency level
)
Inserir dados
Itere pelas linhas de texto, crie embeddings e, em seguida, insira os dados no Milvus.
Aqui está um novo campo text
, que é um campo não definido no esquema da coleção. Será automaticamente adicionado ao campo dinâmico JSON reservado, que pode ser tratado como um campo normal a um nível elevado.
from tqdm import tqdm
data = []
doc_embeddings = embedding_model.encode_documents(text_lines)
for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": doc_embeddings[i], "text": line})
milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 0%| | 0/72 [00:00<?, ?it/s]huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
To disable this warning, you can either:
- Avoid using `tokenizers` before the fork if possible
- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Creating embeddings: 100%|██████████| 72/72 [00:00<00:00, 246522.36it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}
Construir RAG
Recuperar dados para uma consulta
Vamos especificar uma pergunta frequente sobre o Milvus.
question = "How is data stored in milvus?"
Pesquise a pergunta na coleção e obtenha as 3 principais correspondências semânticas.
search_res = milvus_client.search(
collection_name=collection_name,
data=embedding_model.encode_queries(
[question]
), # Convert the question to an embedding vector
limit=3, # Return top 3 results
search_params={"metric_type": "IP", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
Vejamos os resultados da pesquisa da consulta
import json
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
0.6572665572166443
],
[
"How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the disk. Then Milvus' data node writes the data in the message queue to persistent storage as incremental logs. If `flush()` is called, the data node is forced to write all data in the message queue to persistent storage immediately.\n\n###",
0.6312146186828613
],
[
"How does Milvus handle vector data types and precision?\n\nMilvus supports Binary, Float32, Float16, and BFloat16 vector types.\n\n- Binary vectors: Store binary data as sequences of 0s and 1s, used in image processing and information retrieval.\n- Float32 vectors: Default storage with a precision of about 7 decimal digits. Even Float64 values are stored with Float32 precision, leading to potential precision loss upon retrieval.\n- Float16 and BFloat16 vectors: Offer reduced precision and memory usage. Float16 is suitable for applications with limited bandwidth and storage, while BFloat16 balances range and efficiency, commonly used in deep learning to reduce computational requirements without significantly impacting accuracy.\n\n###",
0.6115777492523193
]
]
Utilizar o LLM para obter uma resposta RAG
Converter os documentos recuperados num formato de cadeia de caracteres.
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
Definir avisos do sistema e do utilizador para o Modelo de Linguagem. Este prompt é montado com os documentos recuperados do Milvus.
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""
Use o modelo deepseek-chat
fornecido pelo DeepSeek para gerar uma resposta com base nos prompts.
response = deepseek_client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT},
],
)
print(response.choices[0].message.content)
In Milvus, data is stored in two main categories: inserted data and metadata.
1. **Inserted Data**: This includes vector data, scalar data, and collection-specific schema. The inserted data is stored in persistent storage as incremental logs. Milvus supports various object storage backends for this purpose, such as MinIO, AWS S3, Google Cloud Storage (GCS), Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage (COS).
2. **Metadata**: Metadata is generated within Milvus and is specific to each Milvus module. This metadata is stored in etcd, a distributed key-value store.
Additionally, when data is inserted, it is first loaded into a message queue, and Milvus returns success at this stage. The data is then written to persistent storage as incremental logs by the data node. If the `flush()` function is called, the data node is forced to write all data in the message queue to persistent storage immediately.
Ótimo! Construímos com sucesso um pipeline RAG com o Milvus e o DeepSeek.