Criar RAG com Milvus e Fireworks AI
OFireworks AI é uma plataforma de inferência de IA generativa que oferece velocidade e prontidão de produção líderes do setor para executar e personalizar modelos. O Fireworks AI fornece uma variedade de serviços de IA generativa, incluindo modelos sem servidor, implantações sob demanda e recursos de ajuste fino. Oferece um ambiente abrangente para a implementação de vários modelos de IA, incluindo modelos de linguagem grande (LLMs) e modelos de incorporação. O Fireworks AI agrega vários modelos, permitindo que os utilizadores acedam e utilizem facilmente estes recursos sem a necessidade de uma configuração extensiva da infraestrutura.
Neste tutorial, vamos mostrar-lhe como construir um pipeline RAG (Retrieval-Augmented Generation) com o Milvus e o Fireworks AI.
Preparação
Dependências e ambiente
$ pip install --upgrade pymilvus openai requests tqdm
Se estiver a utilizar o Google Colab, para ativar as dependências que acabou de instalar, poderá ter de reiniciar o tempo de execução (clique no menu "Tempo de execução" na parte superior do ecrã e selecione "Reiniciar sessão" no menu pendente).
O Fireworks AI ativa a API de estilo OpenAI. Pode iniciar sessão no seu sítio Web oficial e preparar a chave api FIREWORKS_API_KEY
como variável de ambiente.
import os
os.environ["FIREWORKS_API_KEY"] = "***********"
Preparar os dados
Utilizamos as páginas de FAQ da Documentação do Milvus 2.4.x como conhecimento privado no nosso RAG, que é uma boa fonte de dados para um pipeline RAG simples.
Descarregue o ficheiro zip e extraia os documentos para a pasta milvus_docs
.
$ wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
$ unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
Carregamos todos os ficheiros markdown da pasta milvus_docs/en/faq
. Para cada documento, utilizamos simplesmente "#" para separar o conteúdo do ficheiro, o que permite separar aproximadamente o conteúdo de cada parte principal do ficheiro markdown.
from glob import glob
text_lines = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()
text_lines += file_text.split("# ")
Preparar o LLM e o modelo de incorporação
Inicializamos um cliente para preparar o LLM e o modelo de incorporação. O Fireworks AI habilita a API no estilo OpenAI, e você pode usar a mesma API com pequenos ajustes para chamar o modelo de incorporação e o LLM.
from openai import OpenAI
fireworks_client = OpenAI(
api_key=os.environ["FIREWORKS_API_KEY"],
base_url="https://api.fireworks.ai/inference/v1",
)
Defina uma função para gerar embeddings de texto utilizando o cliente. Utilizamos o modelo nomic-ai/nomic-embed-text-v1.5
como exemplo.
def emb_text(text):
return (
fireworks_client.embeddings.create(
input=text, model="nomic-ai/nomic-embed-text-v1.5"
)
.data[0]
.embedding
)
Gere um embedding de teste e imprima a sua dimensão e os primeiros elementos.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
768
[0.04815673828125, 0.0261993408203125, -0.1749267578125, -0.03131103515625, 0.068115234375, -0.00621795654296875, 0.03955078125, -0.0210723876953125, 0.039703369140625, -0.0286102294921875]
Carregar dados no Milvus
Criar a coleção
from pymilvus import MilvusClient
milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"
Quanto ao argumento de MilvusClient
:
- Definir o
uri
como um ficheiro local, por exemplo./milvus.db
, é o método mais conveniente, pois utiliza automaticamente o Milvus Lite para armazenar todos os dados neste ficheiro. - Se tiver uma grande escala de dados, pode configurar um servidor Milvus mais eficiente em docker ou kubernetes. Nesta configuração, utilize o uri do servidor, por exemplo,
http://localhost:19530
, como o seuuri
. - Se pretender utilizar o Zilliz Cloud, o serviço de nuvem totalmente gerido para o Milvus, ajuste os endereços
uri
etoken
, que correspondem ao Public Endpoint e à chave Api no Zilliz Cloud.
Verificar se a coleção já existe e eliminá-la se existir.
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Criar uma nova coleção com os parâmetros especificados.
Se não especificarmos qualquer informação de campo, o Milvus criará automaticamente um campo id
por defeito para a chave primária e um campo vector
para armazenar os dados vectoriais. Um campo JSON reservado é utilizado para armazenar campos não definidos pelo esquema e os respectivos valores.
milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # Inner product distance
consistency_level="Strong", # Strong consistency level
)
Inserir dados
Itere pelas linhas de texto, crie embeddings e, em seguida, insira os dados no Milvus.
Aqui está um novo campo text
, que é um campo não definido no esquema da coleção. Será automaticamente adicionado ao campo dinâmico JSON reservado, que pode ser tratado como um campo normal a um nível elevado.
from tqdm import tqdm
data = []
for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": emb_text(line), "text": line})
milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 100%|██████████| 72/72 [00:28<00:00, 2.51it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}
Construir RAG
Recuperar dados para uma consulta
Vamos especificar uma pergunta frequente sobre o Milvus.
question = "How is data stored in milvus?"
Pesquise a pergunta na coleção e obtenha as 3 principais correspondências semânticas.
search_res = milvus_client.search(
collection_name=collection_name,
data=[
emb_text(question)
], # Use the `emb_text` function to convert the question to an embedding vector
limit=3, # Return top 3 results
search_params={"metric_type": "IP", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
Vejamos os resultados da pesquisa da consulta
import json
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
0.8334928750991821
],
[
"How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the disk. Then Milvus' data node writes the data in the message queue to persistent storage as incremental logs. If `flush()` is called, the data node is forced to write all data in the message queue to persistent storage immediately.\n\n###",
0.746377170085907
],
[
"What is the maximum dataset size Milvus can handle?\n\n \nTheoretically, the maximum dataset size Milvus can handle is determined by the hardware it is run on, specifically system memory and storage:\n\n- Milvus loads all specified collections and partitions into memory before running queries. Therefore, memory size determines the maximum amount of data Milvus can query.\n- When new entities and and collection-related schema (currently only MinIO is supported for data persistence) are added to Milvus, system storage determines the maximum allowable size of inserted data.\n\n###",
0.7328270673751831
]
]
Utilizar o LLM para obter uma resposta RAG
Converter os documentos recuperados num formato de cadeia de caracteres.
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
Definir avisos do sistema e do utilizador para o Modelo de Linguagem. Este prompt é montado com os documentos recuperados do Milvus.
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""
Utilize o modelo llama-v3p1-405b-instruct
fornecido pelo Fireworks para gerar uma resposta com base nos avisos.
response = fireworks_client.chat.completions.create(
model="accounts/fireworks/models/llama-v3p1-405b-instruct",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT},
],
)
print(response.choices[0].message.content)
According to the provided context, Milvus stores data in two ways:
1. Inserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental logs. This can be done using multiple object storage backends such as MinIO, AWS S3, Google Cloud Storage, Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage.
2. Metadata, which are generated within Milvus, are stored in etcd, with each Milvus module having its own metadata.
Additionally, when data is inserted, it is first loaded into a message queue, and then written to persistent storage as incremental logs by the data node. The `flush()` function can be used to force the data node to write all data in the message queue to persistent storage immediately.
Ótimo! Criámos com êxito um pipeline RAG com o Milvus e o Fireworks AI.