Milvus
Zilliz
  • Home
  • Blog
  • 超越语境超载:Parlant × Milvus 如何为 LLM Agents 行为带来控制力和清晰度

超越语境超载:Parlant × Milvus 如何为 LLM Agents 行为带来控制力和清晰度

  • Tutorials
November 05, 2025
Min Yin

想象一下,你被要求完成一项涉及 200 个业务规则、50 个工具和 30 个演示的任务,而你只有一个小时的时间。这根本不可能。然而,当我们把大型语言模型变成 "Agent",并给它们加载过多指令时,我们往往期望它们能够做到这一点。

实际上,这种方法很快就会失败。传统的 Agents 框架(如 LangChain 或 LlamaIndex)会一次性将所有规则和工具注入模型的上下文中,从而导致规则冲突、上下文过载以及生产中不可预测的行为。

为了解决这个问题,一个名为 Parlant的开源 Agents 框架最近在 GitHub 上受到了越来越多的关注。它引入了一种名为 "对齐模型"(Alignment Modeling)的新方法,以及监督机制和条件转换,使代理行为的可控性和可解释性大大提高。

如果与开源向量数据库Milvus 搭配使用,Parlant 的功能将更加强大。Milvus 增加了语义智能,允许 Agents 实时动态检索最相关的规则和上下文,使其保持准确、高效,并为生产做好准备。

在这篇文章中,我们将探讨 Parlant 如何在暗中工作--以及如何将其与 Milvus 集成,实现生产级功能。

传统 Agents 框架为何分崩离析

传统的 Agents 框架喜欢大而全:数百条规则、数十种工具和少量演示--所有这些都塞进了一个拥挤不堪的提示中。这在演示或小型沙盒测试中可能看起来不错,但一旦将其推向生产,裂缝就会迅速显现。

  • 冲突的规则带来混乱:当两个或两个以上的规则同时适用时,这些框架没有内置的方法来决定哪一个获胜。有时它会选择其中一个。有时会混合两种规则。有时,它还会做一些完全无法预测的事情。

  • 边缘案例暴露漏洞:你不可能预测用户可能说的每一句话。而当你的模型遇到训练数据之外的情况时,它就会默认为通用的、不置可否的答案。

  • 调试既痛苦又昂贵:当 Agents 出现异常时,几乎不可能确定是哪条规则导致了问题。由于所有问题都集中在一个巨大的系统提示符中,修复问题的唯一方法就是重写提示符,然后从头开始重新测试。

什么是 Parlant 及其工作原理

Parlant 是 LLM 代理的开源对齐引擎。通过以结构化、基于规则的方式为 Agents 的决策过程建模,您可以精确控制 Agents 在不同场景中的行为方式。

为了解决传统 Agents 框架中存在的问题,Parlant 引入了一种新的强大方法:对齐模型。其核心理念是将规则定义与规则执行分离开来,确保在任何给定时间内,只有最相关的规则才会被注入到 LLM 的上下文中。

细化指南:对齐模型的核心

Parlant 对齐模型的核心是 "细化指南"(Granular Guidelines)概念。与其编写一个充满规则的巨型系统提示,不如定义小型、模块化的指导原则--每条指导原则都描述了代理应如何处理特定类型的情况。

每条准则由三部分组成:

  • 条件--规则适用时间的自然语言描述。Parlant 将条件转换为语义向量,并将其与用户输入相匹配,以确定是否相关。

  • 操作--明确的指令,定义一旦条件满足,Agent 应如何做出反应。该操作只有在触发时才会注入 LLM 的上下文。

  • 工具- 与特定规则相关的任何外部函数或 API。只有当准则处于活动状态时,代理才会接触到这些工具,从而保持工具使用的可控性和上下文感知性。

await agent.create_guideline(
    condition="The user asks about a refund and the order amount exceeds 500 RMB",
    action="First call the order status check tool to confirm whether the refund conditions are met, then provide a detailed explanation of the refund process",
    tools=[check_order_status, calculate_refund_amount]
)

每次用户与 Agents 交互时,Parlant 都会运行一个轻量级匹配步骤,找出三到五个最相关的准则。只有这些规则才会注入到模型的上下文中,从而使提示简洁明了、重点突出,同时确保 Agents 始终遵循正确的规则。

确保准确性和一致性的监督机制

为了进一步保持准确性和一致性,Parlant 引入了一种监督机制,作为第二层质量控制。这一过程分为三个步骤:

1.生成候选回复--Agent 根据匹配的准则和当前对话语境创建初始回复。

2.检查合规性--将回复与活动指南进行比较,以验证是否正确遵循了每一条指令。

3.修改或确认--如果发现任何问题,系统会纠正输出;如果一切正常,回复会被批准并发送给用户。

这种监督机制确保 Agents 不仅理解规则,而且在回复前切实遵守规则--既提高了可靠性,又增强了控制力。

控制和安全的条件转换

在传统 Agents 框架中,所有可用工具都会随时暴露给 LLM。这种 "一切都摆在桌面上 "的方法往往会导致过载提示和意外的工具调用。Parlant 通过条件转换解决了这一问题。与状态机的工作原理类似,只有在满足特定条件时,才会触发操作或工具。每个工具都与相应的准则紧密相连,只有当该准则的条件被激活时,工具才会可用。

# The balance inquiry tool is exposed only when the condition "the user wants to make a transfer" is met
await agent.create_guideline(
    condition="The user wants to make a transfer",
    action="First check the account balance. If the balance is below 500 RMB, remind the user that an overdraft fee may apply.",
    tools=[get_user_account_balance]
)

这种机制将工具调用转化为条件转换--只有当工具的触发条件得到满足时,工具才会从 "非活动 "状态转为 "活动 "状态。通过这种结构化的执行方式,Parlant 可以确保每项操作都是经过深思熟虑并根据具体情况进行的,从而在提高效率和系统安全性的同时防止误用。

Milvus 如何为 Parlant 提供动力

当我们深入了解 Parlant 的指南匹配流程时,一个核心技术挑战变得清晰可见:系统如何在几毫秒内从数百甚至数千个选项中找出三到五个最相关的规则?这正是向量数据库的用武之地。语义检索使这成为可能。

Milvus如何支持Parlant的指南匹配流程

指南匹配是通过语义相似性实现的。每条准则的 "条件 "字段都会被转换为向量嵌入,从而捕捉其含义,而不仅仅是字面文本。当用户发送信息时,Parlant 会将该信息的语义与所有存储的指南嵌入进行比较,以找出最相关的指南。

以下是整个过程的具体步骤:

1.对查询进行编码--将用户的信息和最近的对话历史转化为查询向量。

2.搜索相似性--系统在指南向量存储区内执行相似性搜索,以找到最接近的匹配项。

3.检索 Top-K 结果- 返回语义最相关的前三到五个指南。

4.插入上下文--然后将这些匹配的指南动态插入 LLM 的上下文中,以便模型能够根据正确的规则行事。

要实现这一工作流程,向量数据库必须具备三个关键能力:高性能近似近邻(ANN)搜索、灵活的元数据过滤和实时向量更新。开源云原生向量数据库Milvus 在这三个方面都能提供生产级性能。

为了了解 Milvus 在实际场景中的工作原理,我们以一个金融服务 Agents 为例。

假设系统定义了 800 项业务指南,涵盖账户查询、资金转账和财富管理产品咨询等任务。在此设置中,Milvus 充当所有指南数据的存储和检索层。

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import parlant.sdk as p

# Connect to Milvus connections.connect(host=“localhost”, port=“19530”)

# Define the schema for the guideline collection fields = [ FieldSchema(name=“guideline_id”, dtype=DataType.VARCHAR, max_length=100, is_primary=True), FieldSchema(name=“condition_vector”, dtype=DataType.FLOAT_VECTOR, dim=768), FieldSchema(name=“condition_text”, dtype=DataType.VARCHAR, max_length=1000), FieldSchema(name=“action_text”, dtype=DataType.VARCHAR, max_length=2000), FieldSchema(name=“priority”, dtype=DataType.INT64), FieldSchema(name=“business_domain”, dtype=DataType.VARCHAR, max_length=50) ] schema = CollectionSchema(fields=fields, description=“Agent Guidelines”) guideline_collection = Collection(name=“agent_guidelines”, schema=schema)

# Create an HNSW index for high-performance retrieval index_params = { “index_type”: “HNSW”, “metric_type”: “COSINE”, “params”: {“M”: 16, “efConstruction”: 200} } guideline_collection.create_index(field_name=“condition_vector”, index_params=index_params)

现在,当用户说 "我想向我母亲的账户转账 10 万元人民币 "时,运行时的流程是

1.对查询进行重校--将用户输入转换为 768 维向量。

2.混合检索- 在 Milvus 中运行向量相似性检索,并进行元数据过滤(如business_domain="transfer" )。

3.结果排序- 根据相似性得分和优先级值对候选指南进行排序。

4.上下文注入--将前 3 个匹配指南的action_text 注入 Parlant 代理的上下文。

在这种配置下,即使指南库扩展到 10 万个条目,Milvus 的 P99 延迟也能低于 15 毫秒。相比之下,使用传统关系数据库进行关键字匹配时,延迟时间通常超过 200 毫秒,匹配准确率也会大大降低。

Milvus 如何实现长期记忆和个性化

Milvus 所做的不仅仅是指南匹配。在代理需要长期记忆和个性化响应的场景中,Milvus 可以充当记忆层,以向量嵌入的形式存储和检索用户过去的互动,帮助代理记住之前讨论的内容。

# store user’s past interactions
user_memory_fields = [
    FieldSchema(name="interaction_id", dtype=DataType.VARCHAR, max_length=100, is_primary=True),
    FieldSchema(name="user_id", dtype=DataType.VARCHAR, max_length=50),
    FieldSchema(name="interaction_vector", dtype=DataType.FLOAT_VECTOR, dim=768),
    FieldSchema(name="interaction_summary", dtype=DataType.VARCHAR, max_length=500),
    FieldSchema(name="timestamp", dtype=DataType.INT64)
]
memory_collection = Collection(name="user_memory", schema=CollectionSchema(user_memory_fields))

当同一个用户再次访问时,Agent 可以从 Milvus 中检索出最相关的历史互动,并利用这些互动生成联系更紧密、类似人类的体验。例如,如果用户上周询问了投资基金的情况,Agent 就能回忆起当时的上下文,并主动做出回应:"欢迎回来!您对我们上次讨论的基金还有问题吗?

如何优化 Milvus 驱动的 Agents 系统性能

在生产环境中部署由 Milvus 支持的 Agents 系统时,性能调整变得至关重要。要实现低延迟和高吞吐量,需要注意几个关键参数:

1.选择正确的索引类型

选择合适的索引结构非常重要。例如,HNSW(Hierarchical Navigable Small World,分层导航小世界)是金融或医疗保健等高调用场景的理想选择,在这些场景中,准确性至关重要。IVF_FLAT 则更适合电子商务推荐等大规模应用,在这些应用中,可以接受稍低的召回率来换取更快的性能和更少的内存使用。

2.分片策略

当存储的指南条目数超过一百万时,建议使用分区(Partition)按业务领域或用例划分数据。分区可以减少每次查询的搜索空间,提高检索速度,并在数据集增长时保持稳定的延迟。

3.缓存配置

对于频繁访问的准则,如标准客户查询或高流量工作流,可以使用 Milvus 查询结果缓存。这允许系统重复使用以前的结果,将重复搜索的延迟时间缩短到 5 毫秒以下。

实践演示:如何使用 Parlant 和 Milvus Lite 构建智能问答系统

Milvus Lite是 Milvus 的轻量级版本,它是一个 Python 库,可以轻松嵌入到您的应用程序中。它非常适合在 Jupyter Notebooks 等环境中进行快速原型开发,或在计算资源有限的边缘设备和智能设备上运行。尽管 Milvus Lite 占用空间小,但它支持与 Milvus 其他部署相同的 API。这意味着您为 Milvus Lite 编写的客户端代码以后可以无缝连接到完整的 Milvus 或 Zilliz Cloud 实例--无需重构。

在本演示中,我们将把 Milvus Lite 与 Parlant 结合使用,演示如何构建一个智能问答系统,以最少的设置提供快速、上下文感知的答案。

前提条件: 1.Parlant GitHub

1.帕兰特 GitHub: https://github.com/emcie-co/parlant

2.Parlant 文档: https://parlant.io/docs

3.python3.10以上

4.OpenAI_key

5.MlivusLite

第 1 步:安装依赖项

# Install required Python packages
pip install pymilvus parlant openai
# Or, if you’re using a Conda environment:
conda activate your_env_name
pip install pymilvus parlant openai

第 2 步:配置环境变量

# Set your OpenAI API key
export OPENAI_API_KEY="your_openai_api_key_here"
# Verify that the variable is set correctly
echo $OPENAI_API_KEY

第 3 步:执行核心代码

  • 创建自定义 OpenAI 嵌入器
class OpenAIEmbedder(p.Embedder):
    # Converts text into vector embeddings with built-in timeout and retry
    # Dimension: 1536 (text-embedding-3-small)
    # Timeout: 60 seconds; Retries: up to 2 times
  • 初始化知识库

1.创建一个名为 kb_articles 的 Collections。

2.插入样本数据(如退款政策、换货政策、发货时间)。

3.建立一个 HNSW 索引,以加速检索。

  • 构建向量搜索工具
@p.tool
async def vector_search(query: str, top_k: int = 5, min_score: float = 0.35):
    # 1. Convert user query into a vector
    # 2. Perform similarity search in Milvus
    # 3. Return results with relevance above threshold
  • 配置 Parlant Agents

指导原则 1:对于事实或政策相关问题,Agent 必须首先执行向量搜索。

准则 2:找到证据后,Agent 必须使用结构化模板(摘要+要点+来源)进行回复。

# Guideline 1: Run vector search for factual or policy-related questions
await agent.create_guideline(
            condition="User asks a factual question about policy, refund, exchange, or shipping",
            action=(
                "Call vector_search with the user's query. "
                "If evidence is found, synthesize an answer by quoting key sentences and cite doc_id/title. "
                "If evidence is insufficient, ask a clarifying question before answering."
            ),
            tools=[vector_search],

# Guideline 2: Use a standardized, structured response when evidence is available await agent.create_guideline( condition=“Evidence is available”, action=( “Answer with the following template:\n” “Summary: provide a concise conclusion.\n” “Key points: 2-3 bullets distilled from evidence.\n” “Sources: list doc_id and title.\n” “Note: if confidence is low, state limitations and ask for clarification.” ), tools=[], )

tools=[],

)

  • 编写完整代码
import os
import asyncio
import json
from typing import List, Dict, Any
import parlant.sdk as p
from pymilvus import MilvusClient, DataType
# 1) Environment variables: using OpenAI (as both the default generation model and embedding service)
# Make sure the OPENAI_API_KEY is set
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
    raise RuntimeError("Please set OPENAI_API_KEY environment variable")
# 2) Initialize Milvus Lite (runs locally, no standalone service required)
# MilvusClient runs in Lite mode using a local file path (requires pymilvus >= 2.x)
client = MilvusClient("./milvus_demo.db")  # Lite mode uses a local file path
COLLECTION = "kb_articles"
# 3) Example data: three policy or FAQ entries (in practice, you can load and chunk data from files)
DOCS = [
    {"doc_id": "POLICY-001", "title": "Refund Policy", "chunk": "Refunds are available within 30 days of purchase if the product is unused."},
    {"doc_id": "POLICY-002", "title": "Exchange Policy", "chunk": "Exchanges are permitted within 15 days; original packaging required."},
    {"doc_id": "FAQ-101", "title": "Shipping Time", "chunk": "Standard shipping usually takes 3–5 business days within the country."},
]
# 4) Generate embeddings using OpenAI (you can replace this with another embedding service)
# Here we use Parlant’s built-in OpenAI embedder for simplicity, but you could also call the OpenAI SDK directly.
class OpenAIEmbedder(p.Embedder):
    async def embed(self, texts: List[str], hints: Dict[str, Any] = {}) -> p.EmbeddingResult:
        # Generate text embeddings using the OpenAI API, with timeout and retry handling
        import openai
        try:
            client = openai.AsyncOpenAI(
                api_key=OPENAI_API_KEY,
                timeout=60.0,  # 60-second timeout
                max_retries=2  # Retry up to 2 times
            )
            print(f"Generating embeddings for {len(texts)} texts...")
            response = await client.embeddings.create(
                model="text-embedding-3-small",
                input=texts
            )
            vectors = [data.embedding for data in response.data]
            print(f"Successfully generated {len(vectors)} embeddings.")
            return p.EmbeddingResult(vectors=vectors)
        except Exception as e:
            print(f"OpenAI API call failed: {e}")
            # Return mock vectors for testing Milvus connectivity
            print("Using mock vectors for testing...")
            import random
            vectors = [[random.random() for _ in range(1536)] for _ in texts]
            return p.EmbeddingResult(vectors=vectors)
    @property
    def id(self) -> str:
        return "text-embedding-3-small"
    @property
    def max_tokens(self) -> int:
        return 8192
    @property
    def tokenizer(self) -> p.EstimatingTokenizer:
        from parlant.core.nlp.tokenization import ZeroEstimatingTokenizer
        return ZeroEstimatingTokenizer()
    @property
    def dimensions(self) -> int:
        return 1536
embedder = OpenAIEmbedder()
async def ensure_collection_and_load():
    # Create the collection (schema: primary key, vector field, additional fields)
    if not client.has_collection(COLLECTION):
        client.create_collection(
            collection_name=COLLECTION,
            dimension=len((await embedder.embed(["dimension_probe"])).vectors[0]),
            # Default metric: COSINE (can be changed with metric_type="COSINE")
            auto_id=True,
        )
        # Create an index to speed up retrieval (HNSW used here as an example)
        client.create_index(
            collection_name=COLLECTION,
            field_name="vector",
            index_type="HNSW",
            metric_type="COSINE",
            params={"M": 32, "efConstruction": 200}
        )
    # Insert data (skip if already exists; simple idempotent logic for the demo)
    # Generate embeddings
    chunks = [d["chunk"] for d in DOCS]
    embedding_result = await embedder.embed(chunks)
    vectors = embedding_result.vectors
    # Check if the same doc_id already exists; this is for demo purposes only — real applications should use stricter deduplication
    # Here we insert directly. In production, use an upsert operation or an explicit primary key
    client.insert(
        COLLECTION,
        data=[
            {"vector": vectors[i], "doc_id": DOCS[i]["doc_id"], "title": DOCS[i]["title"], "chunk": DOCS[i]["chunk"]}
            for i in range(len(DOCS))
        ],
    )
    # Load into memory
    client.load_collection(COLLECTION)
# 5) Define the vector search tool (Parlant Tool)
@p.tool
async def vector_search(context: p.ToolContext, query: str, top_k: int = 5, min_score: float = 0.35) -> p.ToolResult:
    # 5.1 Generate the query vector
    embed_res = await embedder.embed([query])
    qvec = embed_res.vectors[0]
    # 5.2 Search Milvus
    results = client.search(
        collection_name=COLLECTION,
        data=[qvec],
        limit=top_k,
        output_fields=["doc_id", "title", "chunk"],
        search_params={"metric_type": "COSINE", "params": {"ef": 128}},
    )
    # 5.3 Assemble structured evidence and filter by score threshold
    hits = []
    for hit in results[0]:
        score = hit["distance"] if "distance" in hit else hit.get("score", 0.0)
        if score >= min_score:
            hits.append({
                "doc_id": hit["entity"]["doc_id"],
                "title": hit["entity"]["title"],
                "chunk": hit["entity"]["chunk"],
                "score": float(score),
            })
    return p.ToolResult({"evidence": hits})
# 6) Run Parlant Server and create the Agent + Guidelines
async def main():
    await ensure_collection_and_load()
    async with p.Server() as server:
        agent = await server.create_agent(
            name="Policy Assistant",
            description="Rule-controlled RAG assistant with Milvus Lite",
        )
        # Example variable: current time (can be used in templates or logs)
        @p.tool
        async def get_datetime(context: p.ToolContext) -> p.ToolResult:
            from datetime import datetime
            return p.ToolResult({"now": datetime.now().isoformat()})
        await agent.create_variable(name="current-datetime", tool=get_datetime)
        # Core Guideline 1: Run vector search for factual or policy-related questions
        await agent.create_guideline(
            condition="User asks a factual question about policy, refund, exchange, or shipping",
            action=(
                "Call vector_search with the user's query. "
                "If evidence is found, synthesize an answer by quoting key sentences and cite doc_id/title. "
                "If evidence is insufficient, ask a clarifying question before answering."
            ),
            tools=[vector_search],
        )
        # Core Guideline 2: Use a standardized, structured response when evidence is available
        await agent.create_guideline(
            condition="Evidence is available",
            action=(
                "Answer with the following template:\\n"
                "Summary: provide a concise conclusion.\\n"
                "Key points: 2-3 bullets distilled from evidence.\\n"
                "Sources: list doc_id and title.\\n"
                "Note: if confidence is low, state limitations and ask for clarification."
            ),
            tools=[],
        )
        # Hint: Local Playground URL
        print("Playground: <http://localhost:8800>")
if __name__ == "__main__":
    asyncio.run(main())

第 4 步:运行代码

# Run the main program
python main.py

  • 访问 Playground:
<http://localhost:8800>

现在,您已经使用 Parlant 和 Milvus 成功构建了一个智能问答系统。

Parlant 与 LangChain/LlamaIndex 的对比:它们的区别和协同工作方式

LangChainLlamaIndex 等现有 Agents 框架相比,Parlant 有何不同?

LangChain 和 LlamaIndex 是通用框架。它们提供广泛的组件和集成,是快速原型开发和研究实验的理想选择。但是,当需要在生产中部署时,开发人员往往需要自己构建额外的层,如规则管理、合规性检查和可靠性机制,以保持 Agents 的一致性和可信度。

Parlant 提供内置准则管理、自我批评机制和可解释性工具,帮助开发人员管理 Agents 的行为、响应和原因。这使得 Parlant 特别适用于金融、医疗保健和法律服务等准确性和责任性要求较高、面向客户的使用案例。

事实上,这些框架可以协同工作:

  • 使用 LangChain 构建复杂的数据处理管道或检索工作流。

  • 使用 Parlant 管理最终交互层,确保输出遵循业务规则并保持可解释性。

  • 使用 Milvus 作为向量数据库基础,在整个系统中提供实时语义搜索、记忆和知识检索。

结论

随着 LLM Agents 从实验走向生产,关键问题不再是它们能做什么,而是如何可靠、安全地完成任务。Parlant 为这种可靠性提供了结构和控制,而 Milvus 则提供了可扩展的向量基础架构,使一切都保持快速和上下文感知。

两者结合在一起,开发人员就能构建出不仅有能力,而且值得信赖、可解释并可投入生产的人工智能 Agents。

🚀 在 GitHub 上查看 Parlant,并将其与 Milvus集成,构建自己的智能、规则驱动的 Agents 系统。

有问题或想深入了解任何功能?加入我们的 Discord 频道或在 GitHub 上提交问题。您还可以通过 Milvus Office Hours 预订 20 分钟的一对一课程,以获得见解、指导和问题解答。

    Try Managed Milvus for Free

    Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

    Get Started

    Like the article? Spread the word

    扩展阅读