Come iniziare con Dynamiq e Milvus
Dynamiq è un potente framework Gen AI che semplifica lo sviluppo di applicazioni basate sull'AI. Grazie al solido supporto per gli agenti RAG (retrieval-augmented generation) e LLM (large language model), Dynamiq consente agli sviluppatori di creare sistemi intelligenti e dinamici con facilità ed efficienza.
In questo tutorial, vedremo come utilizzare Dynamiq con Milvus, il database vettoriale ad alte prestazioni creato appositamente per i flussi di lavoro RAG. Milvus eccelle per l'archiviazione, l'indicizzazione e il recupero efficiente delle incorporazioni vettoriali, rendendolo un componente indispensabile per i sistemi di intelligenza artificiale che richiedono un accesso rapido e preciso ai dati contestuali.
Questa guida passo passo coprirà due flussi di lavoro RAG fondamentali:
Flusso di indicizzazione dei documenti: imparare a elaborare i file di input (ad esempio, i PDF), trasformare il loro contenuto in embeddings vettoriali e memorizzarli in Milvus. Sfruttando le capacità di indicizzazione ad alte prestazioni di Milvus, i dati sono pronti per essere recuperati rapidamente.
Flusso di recupero dei documenti: scoprite come interrogare Milvus per trovare embeddings di documenti rilevanti e usarli per generare risposte perspicaci e consapevoli del contesto con gli agenti LLM di Dynamiq, creando un'esperienza utente AI senza soluzione di continuità.
Alla fine di questo tutorial, avrete una solida comprensione di come Milvus e Dynamiq lavorano insieme per costruire sistemi di intelligenza artificiale scalabili e consapevoli del contesto, adatti alle vostre esigenze.
Preparazione
Scaricare le librerie necessarie
$ pip install dynamiq pymilvus
Se si utilizza Google Colab, per abilitare le dipendenze appena installate, potrebbe essere necessario riavviare il runtime (fare clic sul menu "Runtime" nella parte superiore dello schermo e selezionare "Riavvia sessione" dal menu a discesa).
Configurare l'agente LLM
In questo esempio utilizzeremo OpenAI come LLM. È necessario preparare la chiave api OPENAI_API_KEY
come variabile d'ambiente.
import os
os.environ["OPENAI_API_KEY"] = "sk-***********"
RAG - Flusso di indicizzazione dei documenti
Questa esercitazione mostra un flusso di lavoro RAG (Retrieval-Augmented Generation) per l'indicizzazione di documenti con Milvus come database vettoriale. Il flusso di lavoro prende in input i file PDF, li elabora in parti più piccole, genera incorporazioni vettoriali utilizzando il modello di incorporazione di OpenAI e memorizza le incorporazioni in una raccolta Milvus per un recupero efficiente.
Alla fine di questo flusso di lavoro, avrete un sistema di indicizzazione dei documenti scalabile ed efficiente, in grado di supportare le future attività di RAG, come la ricerca semantica e la risposta alle domande.
Importare le librerie necessarie e inizializzare il flusso di lavoro
# Importing necessary libraries for the workflow
from io import BytesIO
from dynamiq import Workflow
from dynamiq.nodes import InputTransformer
from dynamiq.connections import (
OpenAI as OpenAIConnection,
Milvus as MilvusConnection,
MilvusDeploymentType,
)
from dynamiq.nodes.converters import PyPDFConverter
from dynamiq.nodes.splitters.document import DocumentSplitter
from dynamiq.nodes.embedders import OpenAIDocumentEmbedder
from dynamiq.nodes.writers import MilvusDocumentWriter
# Initialize the workflow
rag_wf = Workflow()
Definire il nodo convertitore PDF
converter = PyPDFConverter(document_creation_mode="one-doc-per-page")
converter_added = rag_wf.flow.add_nodes(
converter
) # Add node to the DAG (Directed Acyclic Graph)
Definire il nodo di divisione dei documenti
document_splitter = DocumentSplitter(
split_by="sentence", # Splits documents into sentences
split_length=10,
split_overlap=1,
input_transformer=InputTransformer(
selector={
"documents": f"${[converter.id]}.output.documents",
},
),
).depends_on(
converter
) # Set dependency on the PDF converter
splitter_added = rag_wf.flow.add_nodes(document_splitter) # Add to the DAG
Definire il nodo di incorporamento
embedder = OpenAIDocumentEmbedder(
connection=OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"]),
input_transformer=InputTransformer(
selector={
"documents": f"${[document_splitter.id]}.output.documents",
},
),
).depends_on(
document_splitter
) # Set dependency on the splitter
document_embedder_added = rag_wf.flow.add_nodes(embedder) # Add to the DAG
Definire il nodo Milvus Vector Store
vector_store = (
MilvusDocumentWriter(
connection=MilvusConnection(
deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
),
index_name="my_milvus_collection",
dimension=1536,
create_if_not_exist=True,
metric_type="COSINE",
)
.inputs(documents=embedder.outputs.documents) # Connect to embedder output
.depends_on(embedder) # Set dependency on the embedder
)
milvus_writer_added = rag_wf.flow.add_nodes(vector_store) # Add to the DAG
2024-11-19 22:14:03 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:03 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:04 - DEBUG - Created new connection using: 0bef2849fdb1458a85df8bb9dd27f51d
2024-11-19 22:14:04 - INFO - Collection my_milvus_collection does not exist. Creating a new collection.
2024-11-19 22:14:04 - DEBUG - Successfully created collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
2024-11-19 22:14:05 - DEBUG - Successfully created an index on collection: my_milvus_collection
Milvus offre due tipi di distribuzione, adatti a diversi casi d'uso:
- MilvusDeploymentType.FILE
- Ideale per la prototipazione locale o l'archiviazione di dati su piccola scala.
- Impostare
uri
su un percorso di file locale (ad esempio,./milvus.db
) per sfruttare Milvus Lite, che memorizza automaticamente tutti i dati nel file specificato. - Si tratta di un'opzione comoda per una rapida configurazione e sperimentazione.
- MilvusDeploymentType.HOST
Progettato per scenari di dati su larga scala, come la gestione di oltre un milione di vettori.
Server auto-ospitato
- Distribuisce un server Milvus ad alte prestazioni utilizzando Docker o Kubernetes.
- Configurare l'indirizzo e la porta del server come
uri
(ad esempio,http://localhost:19530
). - Se l'autenticazione è abilitata:
- Fornire
<your_username>:<your_password>
cometoken
. - Se l'autenticazione è disabilitata:
- Lasciare
token
non impostato.
Zilliz Cloud (Servizio gestito)
- Per un'esperienza Milvus completamente gestita e basata sul cloud, utilizzate Zilliz Cloud.
- Impostare
uri
etoken
in base all'endpoint pubblico e alla chiave API forniti nella console di Zilliz Cloud.
Definizione dei dati di input ed esecuzione del flusso di lavoro
file_paths = ["./pdf_files/WhatisMilvus.pdf"]
input_data = {
"files": [BytesIO(open(path, "rb").read()) for path in file_paths],
"metadata": [{"filename": path} for path in file_paths],
}
# Run the workflow with the prepared input data
inserted_data = rag_wf.run(input_data=input_data)
/var/folders/09/d0hx80nj35sb5hxb5cpc1q180000gn/T/ipykernel_31319/3145804345.py:4: ResourceWarning: unclosed file <_io.BufferedReader name='./pdf_files/WhatisMilvus.pdf'>
BytesIO(open(path, "rb").read()) for path in file_paths
ResourceWarning: Enable tracemalloc to get the object allocation traceback
2024-11-19 22:14:09 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution started.
2024-11-19 22:14:09 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution started.
2024-11-19 22:14:09 - INFO - Node PyPDF File Converter - 6eb42b1f-7637-407b-a3ac-4167bcf3b5c4: execution succeeded in 58ms.
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution started.
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/websockets/legacy/__init__.py:6: DeprecationWarning: websockets.legacy is deprecated; see https://websockets.readthedocs.io/en/stable/howto/upgrade.html for upgrade instructions
warnings.warn( # deprecated in 14.0 - 2024-11-09
/Users/jinhonglin/anaconda3/envs/myenv/lib/python3.11/site-packages/pydantic/fields.py:804: PydanticDeprecatedSince20: Using extra keyword arguments on `Field` is deprecated and will be removed. Use `json_schema_extra` instead. (Extra keys: 'is_accessible_to_agent'). Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.7/migration/
warn(
2024-11-19 22:14:09 - INFO - Node DocumentSplitter - 5baed580-6de0-4dcd-bace-d7d947ab6c7f: execution succeeded in 104ms.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution started.
2024-11-19 22:14:09 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - d30a4cdc-0fab-4aff-b2e5-6161a62cb6fd: execution succeeded in 724ms.
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution started.
2024-11-19 22:14:10 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:10 - INFO - Node MilvusDocumentWriter - dddab4cc-1dae-4e7e-9101-1ec353f530da: execution succeeded in 66ms.
2024-11-19 22:14:10 - INFO - Node OpenAIDocumentEmbedder - 91928f67-a00f-48f6-a864-f6e21672ec7e: execution succeeded in 961ms.
2024-11-19 22:14:10 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 1.3s.
2024-11-19 22:14:10 - INFO - Workflow 87878444-6a3d-43f3-ae32-0127564a959f: execution succeeded in 1.3s.
Attraverso questo flusso di lavoro, abbiamo implementato con successo una pipeline di indicizzazione di documenti utilizzando Milvus come database vettoriale e il modello di embedding di OpenAI per la rappresentazione semantica. Questa configurazione consente un recupero rapido e accurato basato su vettori, costituendo la base per i flussi di lavoro RAG come la ricerca semantica, il recupero di documenti e le interazioni contestuali guidate dall'intelligenza artificiale.
Grazie alle capacità di archiviazione scalabili di Milvus e all'orchestrazione di Dynamiq, questa soluzione è pronta sia per la prototipazione che per le implementazioni di produzione su larga scala. È ora possibile estendere questa pipeline per includere attività aggiuntive come la risposta a domande basate sul reperimento o la generazione di contenuti guidati dall'intelligenza artificiale.
Flusso di recupero dei documenti RAG
In questa esercitazione, implementiamo un flusso di recupero di documenti RAG (Retrieval-Augmented Generation). Questo flusso di lavoro prende una query dell'utente, genera un embedding vettoriale per essa, recupera i documenti più rilevanti da un database vettoriale Milvus e utilizza un modello linguistico di grandi dimensioni (LLM) per generare una risposta dettagliata e consapevole del contesto in base ai documenti recuperati.
Seguendo questo flusso di lavoro, si crea una soluzione end-to-end per la ricerca semantica e la risposta alle domande, combinando la potenza del recupero di documenti basato su vettori con le capacità dei modelli linguistici avanzati di OpenAI. Questo approccio consente di rispondere in modo efficiente e intelligente alle domande degli utenti, sfruttando le conoscenze memorizzate nel database dei documenti.
Importazione delle librerie necessarie e inizializzazione del flusso di lavoro
from dynamiq import Workflow
from dynamiq.connections import (
OpenAI as OpenAIConnection,
Milvus as MilvusConnection,
MilvusDeploymentType,
)
from dynamiq.nodes.embedders import OpenAITextEmbedder
from dynamiq.nodes.retrievers import MilvusDocumentRetriever
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Message, Prompt
# Initialize the workflow
retrieval_wf = Workflow()
Definire la connessione e l'incorporatore di testo OpenAI
# Establish OpenAI connection
openai_connection = OpenAIConnection(api_key=os.environ["OPENAI_API_KEY"])
# Define the text embedder node
embedder = OpenAITextEmbedder(
connection=openai_connection,
model="text-embedding-3-small",
)
# Add the embedder node to the workflow
embedder_added = retrieval_wf.flow.add_nodes(embedder)
Definire il recuperatore di documenti Milvus
document_retriever = (
MilvusDocumentRetriever(
connection=MilvusConnection(
deployment_type=MilvusDeploymentType.FILE, uri="./milvus.db"
),
index_name="my_milvus_collection",
dimension=1536,
top_k=5,
)
.inputs(embedding=embedder.outputs.embedding) # Connect to embedder output
.depends_on(embedder) # Dependency on the embedder node
)
# Add the retriever node to the workflow
milvus_retriever_added = retrieval_wf.flow.add_nodes(document_retriever)
2024-11-19 22:14:19 - WARNING - Environment variable 'MILVUS_API_TOKEN' not found
2024-11-19 22:14:19 - INFO - Pass in the local path ./milvus.db, and run it using milvus-lite
2024-11-19 22:14:19 - DEBUG - Created new connection using: 98d1132773af4298a894ad5925845fd2
2024-11-19 22:14:19 - INFO - Collection my_milvus_collection already exists. Skipping creation.
Definire il modello di prompt
# Define the prompt template for the LLM
prompt_template = """
Please answer the question based on the provided context.
Question: {{ query }}
Context:
{% for document in documents %}
- {{ document.content }}
{% endfor %}
"""
# Create the prompt object
prompt = Prompt(messages=[Message(content=prompt_template, role="user")])
Definire il generatore di risposte
answer_generator = (
OpenAI(
connection=openai_connection,
model="gpt-4o",
prompt=prompt,
)
.inputs(
documents=document_retriever.outputs.documents,
query=embedder.outputs.query,
)
.depends_on(
[document_retriever, embedder]
) # Dependencies on retriever and embedder
)
# Add the answer generator node to the workflow
answer_generator_added = retrieval_wf.flow.add_nodes(answer_generator)
Eseguire il flusso di lavoro
# Run the workflow with a sample query
sample_query = "What is the Advanced Search Algorithms in Milvus?"
result = retrieval_wf.run(input_data={"query": sample_query})
answer = result.output.get(answer_generator.id).get("output", {}).get("content")
print(answer)
2024-11-19 22:14:22 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution started.
2024-11-19 22:14:22 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution started.
2024-11-19 22:14:22 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution started.
2024-11-19 22:14:23 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2024-11-19 22:14:23 - INFO - Node OpenAITextEmbedder - 47afb0bc-cf96-429d-b58f-11b6c935fec3: execution succeeded in 474ms.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution started.
2024-11-19 22:14:23 - INFO - Node MilvusDocumentRetriever - 51c8311b-4837-411f-ba42-21e28239a2ee: execution succeeded in 23ms.
2024-11-19 22:14:23 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution started.
2024-11-19 22:14:24 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2024-11-19 22:14:24 - INFO - Node LLM - ac722325-bece-453f-a2ed-135b0749ee7a: execution succeeded in 1.8s.
2024-11-19 22:14:25 - INFO - Flow b30b48ec-d5d2-4e4c-8e25-d6976c8a9c17: execution succeeded in 2.4s.
2024-11-19 22:14:25 - INFO - Workflow f4a073fb-dfb6-499c-8cac-5710a7ad6d47: execution succeeded in 2.4s.
The advanced search algorithms in Milvus include a variety of in-memory and on-disk indexing/search algorithms such as IVF (Inverted File), HNSW (Hierarchical Navigable Small World), and DiskANN. These algorithms have been deeply optimized to enhance performance, delivering 30%-70% better performance compared to popular implementations like FAISS and HNSWLib. These optimizations are part of Milvus's design to ensure high efficiency and scalability in handling vector data.