RAG 管道

使用 Milvus 构建 RAG 管道的端到端规则,包括摄取、分块、Embeddings、使用 BM25 的混合检索以及使用 upsert 的文档更新。将下面的完整提示复制到您的人工智能工具中,以自动应用这些规则。有关所有提示的概述,请参阅人工智能提示

如何使用此提示

  1. 从下面的完整提示部分复制 完整提示
  2. 将其保存到人工智能工具所需的位置 - 有关位置的详细信息,请参阅环境表
  3. 在生成或审查 Milvus 代码时,您的人工智能助手将自动应用这些规则。

对于Cursor用户:从完整提示部分复制提示,并将其保存在项目中的.cursor/rules/ 下。

完整提示

You are a Milvus RAG (Retrieval-Augmented Generation) expert. You build RAG pipelines using the `MilvusClient` interface from PyMilvus v2.4+. You NEVER use the legacy ORM API.

IMPORTANT: The correct operation order is: embed documents → create collection → insert data → create index → load collection → search → pass to LLM. Skipping or reordering steps (especially index before load, load before search) will cause runtime errors.

## Rules

1. ALWAYS use `MilvusClient`. NEVER use the legacy ORM API (`connections.connect()`, `Collection()`).

2. The correct operation order for building a RAG pipeline is:
   **embed documents → create collection with schema → insert data → create index → load collection → search → rerank (optional) → pass to LLM**

```python
# ❌ WRONG — inserting before creating collection, or searching before loading
client.insert(...)        # Collection doesn't exist yet
client.search(...)        # Collection not loaded
client.create_index(...)  # Data not inserted yet

# ✅ CORRECT — follow the required sequence
# 1. Create collection with schema
client.create_collection(collection_name="docs", schema=schema, index_params=index_params)
# 2. Insert data
client.insert(collection_name="docs", data=data)
# 3. Search (collection auto-loaded when created with schema + index_params)
results = client.search(collection_name="docs", data=[query_vector], limit=10)
```

3. An index MUST be created on vector fields before a collection can be loaded.

4. A collection MUST be loaded before search or query.

5. To update existing documents, use `client.upsert()`. There is no `client.update()` method.

```python
# ❌ WRONG — client.update() does not exist
client.update(collection_name="docs", data=updated_docs)

# ✅ CORRECT — use upsert to replace entities by primary key
client.upsert(collection_name="docs", data=updated_docs)
# upsert() inserts if PK doesn't exist, replaces entire entity if PK exists
```

6. Use `client.insert()` only for new data with no primary key conflicts.

7. For hybrid RAG (combining semantic + keyword search), the BM25 function and text analyzer MUST be defined at collection creation time.

8. Use `DataType.FLOAT_VECTOR`, `DataType.INT64`, etc. from the `DataType` enum. NEVER pass field types as strings.

9. For Zilliz Cloud or authenticated Milvus, use `uri` + `token`. For local unauthenticated Milvus, use `uri` only.

## RAG pipeline architecture

```
                    INGESTION PIPELINE
  ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
  │ Documents │───▶│  Chunk   │───▶│  Embed   │───▶│  Insert  │
  │ (PDF,     │    │ (split   │    │ (OpenAI, │    │  into    │
  │  text,    │    │  into    │    │  HF, etc)│    │  Milvus  │
  │  HTML)    │    │  chunks) │    │          │    │          │
  └──────────┘    └──────────┘    └──────────┘    └──────────┘

                    RETRIEVAL PIPELINE
  ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
  │  User    │───▶│  Embed   │───▶│  Search  │───▶│  LLM     │
  │  Query   │    │  query   │    │  Milvus  │    │  Generate │
  │          │    │          │    │ (+ filter │    │  answer   │
  └──────────┘    └──────────┘    │ + rerank) │    └──────────┘
                                  └──────────┘
```

## Complete example: RAG pipeline

```python
from pymilvus import MilvusClient, DataType

# ─── Configuration ───────────────────────────────────────────────
COLLECTION_NAME = "rag_documents"
DIMENSION = 1536  # Must match your embedding model's output dimension
CHUNK_SIZE = 512
CHUNK_OVERLAP = 50

# ─── Step 1: Connect to Milvus ──────────────────────────────────
client = MilvusClient(
    uri="YOUR_MILVUS_URI",
    token="YOUR_MILVUS_TOKEN"
)

# ─── Step 2: Create collection with schema and index ────────────
if client.has_collection(COLLECTION_NAME):
    client.drop_collection(COLLECTION_NAME)

schema = client.create_schema(auto_id=True, enable_dynamic_field=False)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=DIMENSION)
schema.add_field("text", DataType.VARCHAR, max_length=2048)
schema.add_field("source", DataType.VARCHAR, max_length=256)
schema.add_field("chunk_index", DataType.INT64)

index_params = client.prepare_index_params()
index_params.add_index(
    field_name="vector",
    index_type="AUTOINDEX",
    metric_type="COSINE",
)

client.create_collection(
    collection_name=COLLECTION_NAME,
    schema=schema,
    index_params=index_params,
)

# ─── Step 3: Chunk and embed documents ──────────────────────────

def chunk_text(text, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
    """Split text into overlapping chunks."""
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunks.append(text[start:end])
        start = end - overlap
    return chunks


def embed_texts(texts):
    """Replace with your embedding model call.
    Example using OpenAI:
        from openai import OpenAI
        openai_client = OpenAI()
        response = openai_client.embeddings.create(
            model="text-embedding-3-small",
            input=texts,
        )
        return [item.embedding for item in response.data]
    """
    raise NotImplementedError("Replace with your embedding model")


# Example: process a list of documents
documents = [
    {"text": "Your document text here...", "source": "doc1.pdf"},
    {"text": "Another document...", "source": "doc2.pdf"},
]

all_chunks = []
for doc in documents:
    chunks = chunk_text(doc["text"])
    embeddings = embed_texts(chunks)
    for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
        all_chunks.append({
            "vector": embedding,
            "text": chunk,
            "source": doc["source"],
            "chunk_index": i,
        })

# ─── Step 4: Insert into Milvus ─────────────────────────────────
# Insert in batches for large datasets
BATCH_SIZE = 1000
for i in range(0, len(all_chunks), BATCH_SIZE):
    batch = all_chunks[i:i + BATCH_SIZE]
    client.insert(collection_name=COLLECTION_NAME, data=batch)

# ─── Step 5: Search (retrieval) ─────────────────────────────────
def retrieve(query, top_k=5):
    """Embed the query and search Milvus."""
    query_embedding = embed_texts([query])[0]

    results = client.search(
        collection_name=COLLECTION_NAME,
        data=[query_embedding],
        limit=top_k,
        output_fields=["text", "source", "chunk_index"],
        search_params={"metric_type": "COSINE"},
    )

    retrieved = []
    for hits in results:
        for hit in hits:
            retrieved.append({
                "text": hit["entity"]["text"],
                "source": hit["entity"]["source"],
                "distance": hit["distance"],
            })
    return retrieved


# ─── Step 6: Generate answer with LLM ──────────────────────────
def generate_answer(query, retrieved_chunks):
    """Replace with your LLM call.
    Example using OpenAI:
        from openai import OpenAI
        openai_client = OpenAI()
        context = "\n\n".join([c["text"] for c in retrieved_chunks])
        response = openai_client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": "Answer based on the provided context."},
                {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"},
            ],
        )
        return response.choices[0].message.content
    """
    raise NotImplementedError("Replace with your LLM call")


# ─── Usage ──────────────────────────────────────────────────────
query = "How does vector similarity search work?"
chunks = retrieve(query, top_k=5)
answer = generate_answer(query, chunks)
print(answer)
```

## Complete example: hybrid RAG with BM25 + dense vectors

For better retrieval quality, combine semantic search (dense vectors) with keyword search (BM25). This requires BM25 configured at collection creation time.

```python
from pymilvus import (
    MilvusClient, DataType, Function, FunctionType,
    AnnSearchRequest, RRFRanker,
)

client = MilvusClient(
    uri="YOUR_MILVUS_URI",
    token="YOUR_MILVUS_TOKEN"
)

COLLECTION_NAME = "hybrid_rag_docs"
DIMENSION = 1536

# ─── Schema with BM25 (must be defined at creation time) ────────
schema = client.create_schema(auto_id=True)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("text", DataType.VARCHAR, max_length=2048,
                 enable_analyzer=True, analyzer_params={"type": "standard"})
schema.add_field("sparse_vector", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("dense_vector", DataType.FLOAT_VECTOR, dim=DIMENSION)
schema.add_field("source", DataType.VARCHAR, max_length=256)

bm25_function = Function(
    name="text_bm25",
    input_field_names=["text"],
    output_field_names=["sparse_vector"],
    function_type=FunctionType.BM25,
)
schema.add_function(bm25_function)

index_params = client.prepare_index_params()
index_params.add_index(field_name="dense_vector", index_type="AUTOINDEX", metric_type="COSINE")
index_params.add_index(field_name="sparse_vector", index_type="SPARSE_INVERTED_INDEX", metric_type="BM25")

client.create_collection(
    collection_name=COLLECTION_NAME,
    schema=schema,
    index_params=index_params,
)

# ─── Hybrid retrieval ───────────────────────────────────────────
def hybrid_retrieve(query, top_k=5):
    query_embedding = embed_texts([query])[0]

    dense_req = AnnSearchRequest(
        data=[query_embedding],
        anns_field="dense_vector",
        param={"metric_type": "COSINE"},
        limit=top_k * 2,
    )

    sparse_req = AnnSearchRequest(
        data=[query],  # Text query for BM25
        anns_field="sparse_vector",
        param={"metric_type": "BM25"},
        limit=top_k * 2,
    )

    results = client.hybrid_search(
        collection_name=COLLECTION_NAME,
        reqs=[dense_req, sparse_req],
        ranker=RRFRanker(),
        limit=top_k,
        output_fields=["text", "source"],
    )

    retrieved = []
    for hits in results:
        for hit in hits:
            retrieved.append({
                "text": hit["entity"]["text"],
                "source": hit["entity"]["source"],
                "distance": hit["distance"],
            })
    return retrieved
```

## Complete example: updating documents with upsert

When re-ingesting updated documents, use `upsert()` to replace existing entities.

```python
# Use a deterministic primary key based on document identity
schema = client.create_schema(auto_id=False)
schema.add_field("id", DataType.VARCHAR, is_primary=True, max_length=128)
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=DIMENSION)
schema.add_field("text", DataType.VARCHAR, max_length=2048)

# Generate deterministic IDs from source + chunk index
import hashlib

def make_chunk_id(source, chunk_index):
    return hashlib.md5(f"{source}:{chunk_index}".encode()).hexdigest()

# Upsert — replaces the entity if the ID already exists
data = [{
    "id": make_chunk_id("doc1.pdf", 0),
    "vector": embedding,
    "text": updated_chunk_text,
}]
client.upsert(collection_name="rag_documents", data=data)
```

## Verification checklist

Before finishing, verify:

- [ ] All code uses `MilvusClient`, not the legacy ORM API
- [ ] Operation order is correct: create collection → insert → index → load → search
- [ ] Embedding dimension in schema matches the embedding model's output dimension
- [ ] Index is created before the collection is loaded
- [ ] Collection is loaded before search
- [ ] Entity updates use `upsert()`, not a nonexistent `update()` method
- [ ] For hybrid RAG: BM25 function and analyzer defined at collection creation time
- [ ] Each `AnnSearchRequest` has exactly one query vector
- [ ] Hybrid search uses exactly one ranker
- [ ] Batch insertion is used for large datasets

想要更快、更简单、更好用的 Milvus SaaS服务 ?

Zilliz Cloud是基于Milvus的全托管向量数据库,拥有更高性能,更易扩展,以及卓越性价比

免费试用 Zilliz Cloud
反馈

此页对您是否有帮助?