Construir RAG com Milvus e SiliconFlow
ASiliconFlow está empenhada em construir uma plataforma de IA Infra escalável, padronizada e de alto desempenho. A SiliconCloud é uma das principais ofertas da SiliconFlow, descrita como uma plataforma Model as a Service (MaaS). Fornece um ambiente abrangente para a implementação de vários modelos de IA, incluindo modelos de linguagem de grande dimensão (LLM) e modelos de incorporação. O SiliconCloud agrega vários modelos de código aberto, permitindo que os utilizadores acedam e utilizem facilmente estes recursos sem a necessidade de uma configuração de infraestrutura extensa.
Neste tutorial, mostraremos como construir um pipeline RAG (Retrieval-Augmented Generation) com Milvus e SiliconFlow.
Preparação
Dependências e ambiente
$ pip install --upgrade pymilvus openai requests tqdm
Se estiver a utilizar o Google Colab, para ativar as dependências que acabou de instalar, poderá ter de reiniciar o tempo de execução (clique no menu "Tempo de execução" na parte superior do ecrã e selecione "Reiniciar sessão" no menu pendente).
O SiliconFlow ativa a API de estilo OpenAI. Pode iniciar sessão no seu sítio Web oficial e preparar a chave api SILICON_FLOW_API_KEY
como variável de ambiente.
import os
os.environ["SILICON_FLOW_API_KEY"] = "***********"
Preparar os dados
Utilizamos as páginas de FAQ da Documentação Milvus 2.4.x como conhecimento privado no nosso RAG, que é uma boa fonte de dados para um pipeline RAG simples.
Descarregue o ficheiro zip e extraia os documentos para a pasta milvus_docs
.
$ wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
$ unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
Carregamos todos os ficheiros markdown da pasta milvus_docs/en/faq
. Para cada documento, utilizamos simplesmente "#" para separar o conteúdo do ficheiro, o que pode separar aproximadamente o conteúdo de cada parte principal do ficheiro markdown.
from glob import glob
text_lines = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()
text_lines += file_text.split("# ")
Preparar o modelo de incorporação
Inicializamos um cliente para preparar o modelo de incorporação. O SiliconFlow habilita a API no estilo OpenAI, e você pode usar a mesma API com pequenos ajustes para chamar o modelo de incorporação e o LLM.
from openai import OpenAI
siliconflow_client = OpenAI(
api_key=os.environ["SILICON_FLOW_API_KEY"], base_url="https://api.siliconflow.cn/v1"
)
Defina uma função para gerar embeddings de texto usando o cliente. Utilizamos o modelo BAAI/bge-large-en-v1.5
como exemplo.
def emb_text(text):
return (
siliconflow_client.embeddings.create(input=text, model="BAAI/bge-large-en-v1.5")
.data[0]
.embedding
)
Gere um embedding de teste e imprima a sua dimensão e os primeiros elementos.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1024
[0.011475468054413795, 0.02982141077518463, 0.0038535362109541893, 0.035921916365623474, -0.0159175843000412, -0.014918108470737934, -0.018094222992658615, -0.002937349723652005, 0.030917132273316383, 0.03390815854072571]
Carregar dados no Milvus
Criar a coleção
from pymilvus import MilvusClient
milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"
Quanto ao argumento de MilvusClient
:
- Definir o
uri
como um ficheiro local, por exemplo,./milvus.db
, é o método mais conveniente, pois utiliza automaticamente o Milvus Lite para armazenar todos os dados neste ficheiro. - Se tiver uma grande escala de dados, pode configurar um servidor Milvus mais eficiente em docker ou kubernetes. Nesta configuração, utilize o uri do servidor, por exemplo,
http://localhost:19530
, como o seuuri
. - Se pretender utilizar o Zilliz Cloud, o serviço de nuvem totalmente gerido para o Milvus, ajuste os endereços
uri
etoken
, que correspondem ao Public Endpoint e à chave Api no Zilliz Cloud.
Verificar se a coleção já existe e eliminá-la se existir.
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Criar uma nova coleção com os parâmetros especificados.
Se não especificarmos qualquer informação de campo, o Milvus criará automaticamente um campo id
por defeito para a chave primária e um campo vector
para armazenar os dados vectoriais. Um campo JSON reservado é utilizado para armazenar campos não definidos pelo esquema e os respectivos valores.
milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # Inner product distance
consistency_level="Strong", # Strong consistency level
)
Inserir dados
Itere pelas linhas de texto, crie embeddings e, em seguida, insira os dados no Milvus.
Aqui está um novo campo text
, que é um campo não definido no esquema da coleção. Será automaticamente adicionado ao campo dinâmico JSON reservado, que pode ser tratado como um campo normal a um nível elevado.
from tqdm import tqdm
data = []
for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": emb_text(line), "text": line})
milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 100%|██████████| 72/72 [00:04<00:00, 16.97it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}
Construir RAG
Recuperar dados para uma consulta
Vamos especificar uma pergunta frequente sobre o Milvus.
question = "How is data stored in milvus?"
Pesquise a pergunta na coleção e obtenha as 3 principais correspondências semânticas.
search_res = milvus_client.search(
collection_name=collection_name,
data=[
emb_text(question)
], # Use the `emb_text` function to convert the question to an embedding vector
limit=3, # Return top 3 results
search_params={"metric_type": "IP", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
Vejamos os resultados da pesquisa da consulta
import json
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
0.833885133266449
],
[
"How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the disk. Then Milvus' data node writes the data in the message queue to persistent storage as incremental logs. If `flush()` is called, the data node is forced to write all data in the message queue to persistent storage immediately.\n\n###",
0.812842607498169
],
[
"Does the query perform in memory? What are incremental data and historical data?\n\nYes. When a query request comes, Milvus searches both incremental data and historical data by loading them into memory. Incremental data are in the growing segments, which are buffered in memory before they reach the threshold to be persisted in storage engine, while historical data are from the sealed segments that are stored in the object storage. Incremental data and historical data together constitute the whole dataset to search.\n\n###",
0.7714196443557739
]
]
Utilizar o LLM para obter uma resposta RAG
Converter os documentos recuperados num formato de cadeia de caracteres.
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
Definir avisos do sistema e do utilizador para o Modelo de Linguagem. Este prompt é montado com os documentos recuperados do Milvus.
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""
Utilize o modelo deepseek-ai/DeepSeek-V2.5
fornecido pelo SiliconCloud para gerar uma resposta com base nos avisos.
response = siliconflow_client.chat.completions.create(
model="deepseek-ai/DeepSeek-V2.5",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT},
],
)
print(response.choices[0].message.content)
In Milvus, data is stored in two main categories: inserted data and metadata.
- **Inserted Data**: This includes vector data, scalar data, and collection-specific schema, which are stored in persistent storage as incremental logs. Milvus supports various object storage backends such as MinIO, AWS S3, Google Cloud Storage (GCS), Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage (COS).
- **Metadata**: This is generated within Milvus, with each module having its own metadata stored in etcd, a distributed key-value store.
Ótimo! Construímos com sucesso um pipeline RAG com o Milvus e o SiliconFlow.