RAG mit Milvus und Fireworks AI aufbauen
Fireworks AI ist eine generative KI-Inferenzplattform, die branchenführende Geschwindigkeit und Produktionsbereitschaft für die Ausführung und Anpassung von Modellen bietet. Fireworks AI bietet eine Vielzahl von generativen KI-Diensten, einschließlich serverloser Modelle, On-Demand-Bereitstellungen und Feinabstimmungsfunktionen. Es bietet eine umfassende Umgebung für den Einsatz verschiedener KI-Modelle, einschließlich großer Sprachmodelle (LLMs) und Einbettungsmodelle. Fireworks AI fasst zahlreiche Modelle zusammen und ermöglicht den Benutzern den einfachen Zugriff und die Nutzung dieser Ressourcen, ohne dass eine umfangreiche Infrastruktur eingerichtet werden muss.
In diesem Tutorial zeigen wir Ihnen, wie Sie eine RAG-Pipeline (Retrieval-Augmented Generation) mit Milvus und Fireworks AI aufbauen.
Vorbereitung
Abhängigkeiten und Umgebung
$ pip install --upgrade pymilvus openai requests tqdm
Wenn Sie Google Colab verwenden, müssen Sie möglicherweise die Runtime neu starten, um die gerade installierten Abhängigkeiten zu aktivieren (klicken Sie auf das Menü "Runtime" am oberen Bildschirmrand und wählen Sie "Restart session" aus dem Dropdown-Menü).
Fireworks AI aktiviert die OpenAI-ähnliche API. Sie können sich auf der offiziellen Website anmelden und den API-Schlüssel FIREWORKS_API_KEY
als Umgebungsvariable vorbereiten.
import os
os.environ["FIREWORKS_API_KEY"] = "***********"
Bereiten Sie die Daten vor
Wir verwenden die FAQ-Seiten aus der Milvus-Dokumentation 2.4.x als privates Wissen in unserer RAG, was eine gute Datenquelle für eine einfache RAG-Pipeline ist.
Laden Sie die Zip-Datei herunter und entpacken Sie die Dokumente in den Ordner milvus_docs
.
$ wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
$ unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
Wir laden alle Markdown-Dateien aus dem Ordner milvus_docs/en/faq
. Für jedes Dokument verwenden wir einfach "# ", um den Inhalt in der Datei zu trennen, wodurch der Inhalt jedes Hauptteils der Markdown-Datei grob getrennt werden kann.
from glob import glob
text_lines = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()
text_lines += file_text.split("# ")
Vorbereiten des LLM und des Einbettungsmodells
Wir initialisieren einen Client, um das LLM- und Einbettungsmodell vorzubereiten. Fireworks AI ermöglicht die API im Stil von OpenAI, und Sie können dieselbe API mit geringfügigen Anpassungen verwenden, um das Einbettungsmodell und das LLM aufzurufen.
from openai import OpenAI
fireworks_client = OpenAI(
api_key=os.environ["FIREWORKS_API_KEY"],
base_url="https://api.fireworks.ai/inference/v1",
)
Definieren Sie eine Funktion zur Erzeugung von Texteinbettungen mit Hilfe des Clients. Wir verwenden das Modell nomic-ai/nomic-embed-text-v1.5
als Beispiel.
def emb_text(text):
return (
fireworks_client.embeddings.create(
input=text, model="nomic-ai/nomic-embed-text-v1.5"
)
.data[0]
.embedding
)
Erzeugen Sie eine Testeinbettung und geben Sie deren Dimension und die ersten Elemente aus.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
768
[0.04815673828125, 0.0261993408203125, -0.1749267578125, -0.03131103515625, 0.068115234375, -0.00621795654296875, 0.03955078125, -0.0210723876953125, 0.039703369140625, -0.0286102294921875]
Laden der Daten in Milvus
Erstellen Sie die Sammlung
from pymilvus import MilvusClient
milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"
Wie für das Argument von MilvusClient
:
- Das Einstellen von
uri
als lokale Datei, z.B../milvus.db
, ist die bequemste Methode, da sie automatisch Milvus Lite verwendet, um alle Daten in dieser Datei zu speichern. - Wenn Sie große Datenmengen haben, können Sie einen leistungsfähigeren Milvus-Server auf Docker oder Kubernetes einrichten. Bei dieser Einrichtung verwenden Sie bitte die Server-Uri, z. B.
http://localhost:19530
, alsuri
. - Wenn Sie Zilliz Cloud, den vollständig verwalteten Cloud-Service für Milvus, verwenden möchten, passen Sie
uri
undtoken
an, die dem öffentlichen Endpunkt und dem Api-Schlüssel in Zilliz Cloud entsprechen.
Prüfen Sie, ob die Sammlung bereits existiert und löschen Sie sie, wenn dies der Fall ist.
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Erstellen Sie eine neue Sammlung mit den angegebenen Parametern.
Wenn wir keine Feldinformationen angeben, erstellt Milvus automatisch ein Standardfeld id
für den Primärschlüssel und ein Feld vector
zum Speichern der Vektordaten. Ein reserviertes JSON-Feld wird verwendet, um nicht schema-definierte Felder und ihre Werte zu speichern.
milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # Inner product distance
consistency_level="Strong", # Strong consistency level
)
Daten einfügen
Iterieren Sie durch die Textzeilen, erstellen Sie Einbettungen und fügen Sie dann die Daten in Milvus ein.
Hier ist ein neues Feld text
, das ein nicht definiertes Feld im Sammelschema ist. Es wird automatisch dem reservierten dynamischen JSON-Feld hinzugefügt, das auf hoher Ebene wie ein normales Feld behandelt werden kann.
from tqdm import tqdm
data = []
for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": emb_text(line), "text": line})
milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 100%|██████████| 72/72 [00:28<00:00, 2.51it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}
RAG erstellen
Abrufen von Daten für eine Abfrage
Lassen Sie uns eine häufige Frage über Milvus angeben.
question = "How is data stored in milvus?"
Suchen Sie nach der Frage in der Sammlung und rufen Sie die semantischen Top-3-Treffer ab.
search_res = milvus_client.search(
collection_name=collection_name,
data=[
emb_text(question)
], # Use the `emb_text` function to convert the question to an embedding vector
limit=3, # Return top 3 results
search_params={"metric_type": "IP", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
Werfen wir einen Blick auf die Suchergebnisse der Abfrage
import json
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
0.8334928750991821
],
[
"How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the disk. Then Milvus' data node writes the data in the message queue to persistent storage as incremental logs. If `flush()` is called, the data node is forced to write all data in the message queue to persistent storage immediately.\n\n###",
0.746377170085907
],
[
"What is the maximum dataset size Milvus can handle?\n\n \nTheoretically, the maximum dataset size Milvus can handle is determined by the hardware it is run on, specifically system memory and storage:\n\n- Milvus loads all specified collections and partitions into memory before running queries. Therefore, memory size determines the maximum amount of data Milvus can query.\n- When new entities and and collection-related schema (currently only MinIO is supported for data persistence) are added to Milvus, system storage determines the maximum allowable size of inserted data.\n\n###",
0.7328270673751831
]
]
LLM verwenden, um eine RAG-Antwort zu erhalten
Konvertieren Sie die abgerufenen Dokumente in ein String-Format.
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
Definieren Sie System- und Benutzer-Prompts für das Lanage Model. Diese Eingabeaufforderung wird mit den abgerufenen Dokumenten von Milvus zusammengestellt.
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""
Verwenden Sie das von Fireworks bereitgestellte llama-v3p1-405b-instruct
Modell, um eine Antwort auf der Grundlage der Prompts zu generieren.
response = fireworks_client.chat.completions.create(
model="accounts/fireworks/models/llama-v3p1-405b-instruct",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT},
],
)
print(response.choices[0].message.content)
According to the provided context, Milvus stores data in two ways:
1. Inserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental logs. This can be done using multiple object storage backends such as MinIO, AWS S3, Google Cloud Storage, Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage.
2. Metadata, which are generated within Milvus, are stored in etcd, with each Milvus module having its own metadata.
Additionally, when data is inserted, it is first loaded into a message queue, and then written to persistent storage as incremental logs by the data node. The `flush()` function can be used to force the data node to write all data in the message queue to persistent storage immediately.
Großartig! Wir haben erfolgreich eine RAG-Pipeline mit Milvus und Fireworks AI erstellt.