RAG mit Milvus und SiliconFlow aufbauen
SiliconFlow hat es sich zur Aufgabe gemacht, eine skalierbare, standardisierte und hochleistungsfähige KI-Infra-Plattform aufzubauen. SiliconCloud ist eines der Flaggschiffe von SiliconFlow und wird als Model-as-a-Service (MaaS)-Plattform bezeichnet. Sie bietet eine umfassende Umgebung für den Einsatz verschiedener KI-Modelle, einschließlich großer Sprachmodelle (LLMs) und Einbettungsmodelle. SiliconCloud fasst zahlreiche Open-Source-Modelle zusammen und ermöglicht es den Nutzern, auf diese Ressourcen einfach zuzugreifen und sie zu nutzen, ohne eine umfangreiche Infrastruktur einrichten zu müssen.
In diesem Tutorial zeigen wir Ihnen, wie Sie eine RAG-Pipeline (Retrieval-Augmented Generation) mit Milvus und SiliconFlow erstellen.
Vorbereitung
Abhängigkeiten und Umgebung
$ pip install --upgrade pymilvus openai requests tqdm
Wenn Sie Google Colab verwenden, müssen Sie möglicherweise die Runtime neu starten, um die soeben installierten Abhängigkeiten zu aktivieren (klicken Sie auf das Menü "Runtime" am oberen Bildschirmrand und wählen Sie "Restart session" aus dem Dropdown-Menü).
SiliconFlow aktiviert die OpenAI-ähnliche API. Sie können sich auf der offiziellen Website anmelden und den API-Schlüssel SILICON_FLOW_API_KEY
als Umgebungsvariable vorbereiten.
import os
os.environ["SILICON_FLOW_API_KEY"] = "***********"
Bereiten Sie die Daten vor
Wir verwenden die FAQ-Seiten aus der Milvus-Dokumentation 2.4.x als privates Wissen in unserem RAG, was eine gute Datenquelle für eine einfache RAG-Pipeline ist.
Laden Sie die Zip-Datei herunter und entpacken Sie die Dokumente in den Ordner milvus_docs
.
$ wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
$ unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
Wir laden alle Markdown-Dateien aus dem Ordner milvus_docs/en/faq
. Für jedes Dokument verwenden wir einfach "# ", um den Inhalt in der Datei zu trennen, wodurch der Inhalt jedes Hauptteils der Markdown-Datei grob getrennt werden kann.
from glob import glob
text_lines = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()
text_lines += file_text.split("# ")
Vorbereiten des Einbettungsmodells
Wir initialisieren einen Client, um das Einbettungsmodell vorzubereiten. SiliconFlow ermöglicht die API im OpenAI-Stil, und Sie können dieselbe API mit geringfügigen Anpassungen verwenden, um das Einbettungsmodell und den LLM aufzurufen.
from openai import OpenAI
siliconflow_client = OpenAI(
api_key=os.environ["SILICON_FLOW_API_KEY"], base_url="https://api.siliconflow.cn/v1"
)
Definieren Sie eine Funktion zur Erzeugung von Texteinbettungen mithilfe des Clients. Wir verwenden das Modell BAAI/bge-large-en-v1.5
als Beispiel.
def emb_text(text):
return (
siliconflow_client.embeddings.create(input=text, model="BAAI/bge-large-en-v1.5")
.data[0]
.embedding
)
Erzeugen Sie eine Testeinbettung und geben Sie deren Dimension und die ersten Elemente aus.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1024
[0.011475468054413795, 0.02982141077518463, 0.0038535362109541893, 0.035921916365623474, -0.0159175843000412, -0.014918108470737934, -0.018094222992658615, -0.002937349723652005, 0.030917132273316383, 0.03390815854072571]
Laden der Daten in Milvus
Erstellen Sie die Sammlung
from pymilvus import MilvusClient
milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"
Wie für das Argument von MilvusClient
:
- Das Einstellen von
uri
als lokale Datei, z.B../milvus.db
, ist die bequemste Methode, da sie automatisch Milvus Lite verwendet, um alle Daten in dieser Datei zu speichern. - Wenn Sie große Datenmengen haben, können Sie einen leistungsfähigeren Milvus-Server auf Docker oder Kubernetes einrichten. Bei dieser Einrichtung verwenden Sie bitte die Server-Uri, z. B.
http://localhost:19530
, alsuri
. - Wenn Sie Zilliz Cloud, den vollständig verwalteten Cloud-Service für Milvus, verwenden möchten, passen Sie
uri
undtoken
an, die dem öffentlichen Endpunkt und dem Api-Schlüssel in Zilliz Cloud entsprechen.
Prüfen Sie, ob die Sammlung bereits existiert und löschen Sie sie, wenn dies der Fall ist.
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Erstellen Sie eine neue Sammlung mit den angegebenen Parametern.
Wenn wir keine Feldinformationen angeben, erstellt Milvus automatisch ein Standardfeld id
für den Primärschlüssel und ein Feld vector
zum Speichern der Vektordaten. Ein reserviertes JSON-Feld wird verwendet, um nicht schema-definierte Felder und ihre Werte zu speichern.
milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # Inner product distance
consistency_level="Strong", # Strong consistency level
)
Daten einfügen
Iterieren Sie durch die Textzeilen, erstellen Sie Einbettungen und fügen Sie dann die Daten in Milvus ein.
Hier ist ein neues Feld text
, das ein nicht definiertes Feld im Sammelschema ist. Es wird automatisch dem reservierten dynamischen JSON-Feld hinzugefügt, das auf hoher Ebene wie ein normales Feld behandelt werden kann.
from tqdm import tqdm
data = []
for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": emb_text(line), "text": line})
milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 100%|██████████| 72/72 [00:04<00:00, 16.97it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}
RAG erstellen
Abrufen von Daten für eine Abfrage
Lassen Sie uns eine häufige Frage über Milvus angeben.
question = "How is data stored in milvus?"
Suchen Sie nach der Frage in der Sammlung und rufen Sie die semantischen Top-3-Treffer ab.
search_res = milvus_client.search(
collection_name=collection_name,
data=[
emb_text(question)
], # Use the `emb_text` function to convert the question to an embedding vector
limit=3, # Return top 3 results
search_params={"metric_type": "IP", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
Werfen wir einen Blick auf die Suchergebnisse der Abfrage
import json
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
0.833885133266449
],
[
"How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the disk. Then Milvus' data node writes the data in the message queue to persistent storage as incremental logs. If `flush()` is called, the data node is forced to write all data in the message queue to persistent storage immediately.\n\n###",
0.812842607498169
],
[
"Does the query perform in memory? What are incremental data and historical data?\n\nYes. When a query request comes, Milvus searches both incremental data and historical data by loading them into memory. Incremental data are in the growing segments, which are buffered in memory before they reach the threshold to be persisted in storage engine, while historical data are from the sealed segments that are stored in the object storage. Incremental data and historical data together constitute the whole dataset to search.\n\n###",
0.7714196443557739
]
]
LLM verwenden, um eine RAG-Antwort zu erhalten
Konvertieren Sie die abgerufenen Dokumente in ein String-Format.
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
Definieren Sie System- und Benutzer-Prompts für das Lanage Model. Diese Eingabeaufforderung wird mit den abgerufenen Dokumenten von Milvus zusammengestellt.
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""
Verwenden Sie das von SiliconCloud bereitgestellte deepseek-ai/DeepSeek-V2.5
Modell, um eine Antwort auf der Grundlage der Prompts zu generieren.
response = siliconflow_client.chat.completions.create(
model="deepseek-ai/DeepSeek-V2.5",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT},
],
)
print(response.choices[0].message.content)
In Milvus, data is stored in two main categories: inserted data and metadata.
- **Inserted Data**: This includes vector data, scalar data, and collection-specific schema, which are stored in persistent storage as incremental logs. Milvus supports various object storage backends such as MinIO, AWS S3, Google Cloud Storage (GCS), Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage (COS).
- **Metadata**: This is generated within Milvus, with each module having its own metadata stored in etcd, a distributed key-value store.
Großartig! Wir haben erfolgreich eine RAG-Pipeline mit Milvus und SiliconFlow erstellt.