Milvus
Zilliz
  • Home
  • Blog
  • 如何使用 Google ADK 和 Milvus 构建具有长期记忆功能的生产就绪型人工智能 Agents

如何使用 Google ADK 和 Milvus 构建具有长期记忆功能的生产就绪型人工智能 Agents

  • Tutorials
February 26, 2026
Min Yin

在构建智能 Agents 时,最难解决的问题之一就是内存管理:决定 Agents 应该记住什么,忘记什么。

并不是所有的内存都能持续使用。有些数据只在当前对话中需要,因此在对话结束时就应清除。其他数据,如用户偏好,则必须在不同对话中持续存在。当这些数据混杂在一起时,临时数据就会堆积起来,重要信息就会丢失。

真正的问题在于架构。大多数框架都没有明确区分短期内存和长期内存,开发人员只能手动处理。

2025 年发布的谷歌开源Agents 开发工具包(ADK)通过将内存管理作为头等大事,在框架层面解决了这一问题。它默认将短期会话内存和长期内存分开。

在本文中,我们将了解这种分离在实践中是如何发挥作用的。使用 Milvus 作为向量数据库,我们将从头开始构建一个具有真正长期内存的可投入生产的 Agents。

ADK 的核心设计原则

ADK 的设计目的是让开发人员不再需要管理内存。该框架会自动将短期会话数据与长期内存分开,并分别进行适当处理。它通过四种核心设计方案来实现这一目标。

短期和长期内存的内置接口

每个 ADK Agents 都有两个用于管理内存的内置接口:

会话服务(临时数据)

  • 存储内容:当前对话内容和工具调用的中间结果
  • 何时清除:会话结束时自动清除
  • 存储位置:内存(最快)、数据库或云服务

内存服务(长期内存)

  • 存储内容:应记住的信息,如用户偏好或过去的记录
  • 何时清除:不会自动清除;必须手动删除
  • 存储在哪里?ADK 仅定义了接口;存储后端由您决定(例如 Milvus)

三层架构

ADK 将系统分为三层,每一层都有各自的职责:

  • Agents 层:业务逻辑所在,如 "在回答用户之前检索相关内存"。
  • 运行时层:由框架管理,负责创建和销毁会话,并跟踪每个执行步骤。
  • 服务层:与外部系统集成,如 Milvus 等向量数据库或大型模型 API。

这种结构将关注点分开:业务逻辑在 Agents 中,而存储在其他地方。你可以更新其中一个,而不会破坏其他。

一切都记录为事件

Agents 的每个操作--调用记忆调用工具、调用模型、生成响应--都会被记录为事件

这有两个实际好处。首先,当出现问题时,开发人员可以一步步重放整个交互过程,找到确切的故障点。其次,在审计和合规性方面,系统可提供每次用户交互的完整执行跟踪。

基于前缀的数据范围界定

ADK 使用简单的键前缀控制数据可见性:

  • temp:xxx- 仅在当前会话中可见,会话结束时自动删除
  • user:xxx- 在同一用户的所有会话中共享,实现持久的用户首选项
  • app:xxx- 在所有用户中全局共享,适用于应用范围内的知识,如产品文档

通过使用前缀,开发人员可以控制数据范围,而无需编写额外的访问逻辑。框架会自动处理可见性和生命周期。

Milvus 作为 ADK 的内存后端

在 ADK 中,MemoryService 只是一个接口。它定义了如何使用长期内存,而不是如何存储内存。数据库的选择取决于开发人员。那么,什么样的数据库能很好地用作 Agents 的内存后台呢?

Agents 记忆系统的需求--以及 Milvus 如何提供这些需求

  • 语义检索

需求

用户很少以相同的方式提出相同的问题。"连接不上 "和 "连接超时 "的意思是一样的。记忆系统必须理解含义,而不仅仅是匹配关键字。

Milvus 如何满足它

Milvus 支持多种向量索引类型,如 HNSW 和 DiskANN,允许开发人员选择适合自己工作负载的类型。即使有数千万个向量,查询延迟也能保持在 10 毫秒以内,这对于代理使用来说已经足够快了。

  • 混合查询

需求

记忆调用需要的不仅仅是语义搜索。系统还必须通过 user_id 等结构化字段进行过滤,以便只返回当前用户的数据。

Milvus 如何满足它

Milvus 本机支持将向量搜索与标量过滤相结合的混合查询。例如,它可以检索语义相似的记录,同时在同一查询中应用 user_id = 'xxx' 等过滤器,而不会影响性能或召回质量。

  • 可扩展性

需求

随着用户数量和存储记忆的增加,系统必须平滑扩展。随着数据的增加,性能应保持稳定,而不会突然变慢或出现故障。

Milvus 如何满足这一要求

Milvus 采用计算与存储分离的架构。查询能力可根据需要通过添加查询节点进行横向扩展。即使是在单机上运行的独立版本,也能处理数千万个向量,适合早期阶段的部署。

注:为进行本地开发和测试,本文中的示例使用Milvus LiteMilvus Standalone

使用 Milvus 提供的 Long-TermMemory 构建一个 Agents

在本节中,我们将构建一个简单的技术支持 Agents。当用户提出问题时,Agent 会查找过去类似的支持单来回答,而不是重复同样的工作。

这个例子非常有用,因为它展示了真实代理内存系统必须处理的三个常见问题。

  • 跨会话的长期记忆

今天提出的问题可能与几周前创建的票单有关。Agents 必须记住跨会话的信息,而不仅仅是当前会话中的信息。这就是需要通过 MemoryService 管理长期内存的原因。

  • 用户隔离

每个用户的支持历史必须保持私密。一个用户的数据绝不能出现在另一个用户的结果中。这就需要对 user_id 等字段进行过滤,Milvus 通过混合查询支持这种过滤。

  • 语义匹配

用户会以不同的方式描述相同的问题,如 "无法连接 "或 "超时"。关键词匹配是不够的。Agents 需要进行语义搜索,而向量检索可以提供这种搜索。

环境设置

  • Python 3.11+
  • Docker 和 Docker Compose
  • 双子座 API 密钥

本节涵盖基本设置,以确保程序能正确运行。

pip install google-adk pymilvus google-generativeai  
"""  
ADK + Milvus + Gemini Long-term Memory Agent  
Demonstrates how to implement a cross-session memory recall system  
"""  
import os  
import asyncio  
import time  
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility  
import google.generativeai as genai  
from google.adk.agents import Agent  
from google.adk.tools import FunctionTool  
from google.adk.runners import Runner  
from google.adk.sessions import InMemorySessionService  
from google.genai import types  

步骤 1:部署 Milvus 单机版(Docker)

(1) 下载部署文件

wget <https://github.com/Milvus-io/Milvus/releases/download/v2.5.12/Milvus-standalone-docker-compose.yml> -O docker-compose.yml  

(2) 启动 Milvus 服务

docker-compose up -d  
docker-compose ps -a  

第 2 步:模型和连接配置

配置 Gemini API 和 Milvus 连接设置。

# ==================== Configuration ====================  
# 1. Gemini API configuration  
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")  
if not GOOGLE_API_KEY:  
   raise ValueError("Please set the GOOGLE_API_KEY environment variable")  
genai.configure(api_key=GOOGLE_API_KEY)  
# 2. Milvus connection configuration  
MILVUS_HOST = os.getenv("MILVUS_HOST", "localhost")  
MILVUS_PORT = os.getenv("MILVUS_PORT", "19530")  
# 3. Model selection (best combination within the free tier limits)  
LLM_MODEL = "gemini-2.5-flash-lite"  # LLM model: 1000 RPD  
EMBEDDING_MODEL = "models/text-embedding-004"  # Embedding model: 1000 RPD  
EMBEDDING_DIM = 768  # Vector dimension  
# 4. Application configuration  
APP_NAME = "tech_support"  
USER_ID = "user_123"  
print(f"✓ Using model configuration:")  
print(f"  LLM: {LLM_MODEL}")  
print(f"  Embedding: {EMBEDDING_MODEL} (dimension: {EMBEDDING_DIM})")  

第 3 步 Milvus 数据库初始化

创建向量数据库 Collections(类似于关系数据库中的表格)

# ==================== Initialize Milvus ====================  
def init_milvus():  
   """Initialize Milvus connection and collection"""  
   # Step 1: Establish connection  
   Try:  
       connections.connect(  
           alias="default",  
           host=MILVUS_HOST,  
           port=MILVUS_PORT  
       )  
       print(f"✓ Connected to Milvus: {MILVUS_HOST}:{MILVUS_PORT}")  
   except Exception as e:  
       print(f"✗ Failed to connect to Milvus: {e}")  
       print("Hint: make sure Milvus is running")  
       Raise  
   # Step 2: Define data schema  
   fields = [  
       FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),  
       FieldSchema(name="user_id", dtype=DataType.VARCHAR, max_length=100),  
       FieldSchema(name="session_id", dtype=DataType.VARCHAR, max_length=100),  
       FieldSchema(name="question", dtype=DataType.VARCHAR, max_length=2000),  
       FieldSchema(name="solution", dtype=DataType.VARCHAR, max_length=5000),  
       FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=EMBEDDING_DIM),  
       FieldSchema(name="timestamp", dtype=DataType.INT64)  
   ]  
   schema = CollectionSchema(fields, description="Tech support memory")  
   collection_name = "support_memory"  
   # Step 3: Create or load the collection  
   if utility.has_collection(collection_name):  
       memory_collection = Collection(name=collection_name)  
       print(f"✓ Collection '{collection_name}' already exists")  
   Else:  
       memory_collection = Collection(name=collection_name, schema=schema)  
   # Step 4: Create vector index  
   index_params = {  
       "index_type": "IVF_FLAT",  
       "metric_type": "COSINE",  
       "params": {"nlist": 128}  
   }  
   memory_collection.create_index(field_name="embedding", index_params=index_params)  
   print(f"✓ Created collection '{collection_name}' and index")  
   return memory_collection  
# Run initialization  
memory_collection = init_milvus()  

第 4 步 存储器操作符

将存储和检索逻辑封装为 Agents 的工具。

(1) 存储记忆功能

# ==================== Memory Operation Functions ====================  
def store_memory(question: str, solution: str) -> str:  
   """  
   Store a solution record into the memory store  
   Args:  
       question: the user's question  
       solution: the solution  
   Returns:  
       str: result message  
   """  
   Try:  
       print(f"\\n[Tool Call] store_memory")  
       print(f" - question: {question[:50]}...")  
       print(f" - solution: {solution[:50]}...")  
       # Use global USER_ID (in production, this should come from ToolContext)  
       user_id = USER_ID  
       session_id = f"session_{int(time.time())}"  
       # Key step 1: convert the question into a 768-dimensional vector  
       embedding_result = genai.embed_content(  
           model=EMBEDDING_MODEL,  
           content=question,  
           task_type="retrieval_document",  # specify document indexing task  
           output_dimensionality=EMBEDDING_DIM  
       )  
       embedding = embedding_result["embedding"]  
       # Key step 2: insert into Milvus  
       memory_collection.insert([{  
           "user_id": user_id,  
           "session_id": session_id,  
           "question": question,  
           "solution": solution,  
           "embedding": embedding,  
           "timestamp": int(time.time())  
       }])  
       # Key step 3: flush to disk (ensure data persistence)  
       memory_collection.flush()  
       result = "✓ Successfully stored in memory"  
       print(f"[Tool Result] {result}")  
       return result  
   except Exception as e:  
       error_msg = f"✗ Storage failed: {str(e)}"  
       print(f"[Tool Error] {error_msg}")  
       return error_msg  

(2) 检索内存功能

def recall_memory(query: str, top_k: int = 3) -> str:  
   """  
   Retrieve relevant historical cases from the memory store  
   Args:  
       query: query question  
       top_k: number of most similar results to return  
   Returns:  
       str: retrieval result  
   """  
   Try:  
       print(f"\\n[Tool Call] recall_memory")  
       print(f" - query: {query}")  
       print(f" - top_k: {top_k}")  
       user_id = USER_ID  
       # Key step 1: convert the query into a vector  
       embedding_result = genai.embed_content(  
           model=EMBEDDING_MODEL,  
           content=query,  
           task_type="retrieval_query",  # specify query task (different from indexing)  
           output_dimensionality=EMBEDDING_DIM  
       )  
       query_embedding = embedding_result["embedding"]  
       # Key step 2: load the collection into memory (required for the first query)  
       memory_collection.load()  
       # Key step 3: hybrid search (vector similarity + scalar filtering)  
       results = memory_collection.search(  
           data=[query_embedding],  
           anns_field="embedding",  
           param={"metric_type": "COSINE", "params": {"nprobe": 10}},  
           limit=top_k,  
           expr=f'user_id == "{user_id}"',  # 🔑 key to user isolation  
           output_fields=["question", "solution", "timestamp"]  
       )  
       # Key step 4: format results  
       if not results[0]:  
           result = "No relevant historical cases found"  
           print(f"[Tool Result] {result}")  
           return result  
       result_text = f"Found {len(results[0])} relevant cases:\\n\\n"  
       for i, hit in enumerate(results[0]):  
           result_text += f"Case {i+1} (similarity: {hit.score:.2f}):\\n"  
           result_text += f"Question: {hit.entity.get('question')}\\n"  
           result_text += f"Solution: {hit.entity.get('solution')}\\n\\n"  
       print(f"[Tool Result] Found {len(results[0])} cases")  
       return result_text  
   except Exception as e:  
       error_msg = f"Retrieval failed: {str(e)}"  
       print(f"[Tool Error] {error_msg}")  
       return error_msg  

(3) 注册为 ADK 工具

# Usage  
# Wrap functions with FunctionTool  
store_memory_tool = FunctionTool(func=store_memory)  
recall_memory_tool = FunctionTool(func=recall_memory)  
memory_tools = [store_memory_tool, recall_memory_tool]  

第 5 步 代理定义

核心思想:定义 Agents 的行为逻辑。

# ==================== Create Agent ====================  
support_agent = Agent(  
   model=LLM_MODEL,  
   name="support_agent",  
   description="Technical support expert agent that can remember and recall historical cases",  
   # Key: the instruction defines the agent’s behavior  
   instruction="""  
You are a technical support expert. Strictly follow the process below:  
<b>When the user asks a technical question:</b>  
1. Immediately call the recall_memory tool to search for historical cases  
  - Parameter query: use the user’s question text directly  
  - Do not ask for any additional information; call the tool directly  
2. Answer based on the retrieval result:  
  - If relevant cases are found: explain that similar historical cases were found and answer by referencing their solutions  
  - If no cases are found: explain that this is a new issue and answer based on your own knowledge  
3. After answering, ask: “Did this solution resolve your issue?”  
<b>When the user confirms the issue is resolved:</b>  
- Immediately call the store_memory tool to save this Q&A  
- Parameter question: the user’s original question  
- Parameter solution: the complete solution you provided  
<b>Important rules:</b>  
- You must call a tool before answering  
- Do not ask for user_id or any other parameters  
- Only store memory when you see confirmation phrases such as “resolved”, “it works”, or “thanks”  
""",  
   tools=memory_tools  
)  

第 6 步 主程序和执行流程

演示跨会话内存检索的完整过程。

# ==================== Main Program ====================  
async def main():  
   """Demonstrate cross-session memory recall"""  
   # Create Session service and Runner  
   session_service = InMemorySessionService()  
   runner = Runner(  
       agent=support_agent,  
       app_name=APP_NAME,  
       session_service=session_service  
   )  
   # ========== First round: build memory ==========  
   print("\\n" + "=" \* 60)  
   print("First conversation: user asks a question and the solution is stored")  
   print("=" \* 60)  
   session1 = await session_service.create_session(  
       app_name=APP_NAME,  
       user_id=USER_ID,  
       session_id="session_001"  
   )  
   # User asks the first question  
   print("\\n[User]: What should I do if Milvus connection times out?")  
   content1 = types.Content(  
       role='user',  
       parts=[types.Part(text="What should I do if Milvus connection times out?")]  
   )  
   async for event in runner.run_async(  
       user_id=USER_ID,  
       session_id=[session1.id](http://session1.id),  
       new_message=content1  
   ):  
       if event.content and event.content.parts:  
           for part in event.content.parts:  
               if hasattr(part, 'text') and part.text:  
                   print(f"[Agent]: {part.text}")  
   # User confirms the issue is resolved  
   print("\\n[User]: The issue is resolved, thanks!")  
   content2 = types.Content(  
       role='user',  
       parts=[types.Part(text="The issue is resolved, thanks!")]  
   )  
   async for event in runner.run_async(  
       user_id=USER_ID,  
       session_id=[session1.id](http://session1.id),  
       new_message=content2  
   ):  
       if event.content and event.content.parts:  
           for part in event.content.parts:  
               if hasattr(part, 'text') and part.text:  
                   print(f"[Agent]: {part.text}")  
   # ========== Second round: recall memory ==========  
   print("\\n" + "=" \* 60)  
   print("Second conversation: new session with memory recall")  
   print("=" \* 60)  
   session2 = await session_service.create_session(  
       app_name=APP_NAME,  
       user_id=USER_ID,  
       session_id="session_002"  
   )  
   # User asks a similar question in a new session  
   print("\\n[User]: Milvus can't connect")  
   content3 = types.Content(  
       role='user',  
       parts=[types.Part(text="Milvus can't connect")]  
   )  
   async for event in runner.run_async(  
       user_id=USER_ID,  
       session_id=[session2.id](http://session2.id),  
       new_message=content3  
   ):  
       if event.content and event.content.parts:  
           for part in event.content.parts:  
               if hasattr(part, 'text') and part.text:  
                   print(f"[Agent]: {part.text}")

# Program entry point if name == "main":
Try:
asyncio.run(main())
except KeyboardInterrupt:
print(“\n\nProgram exited”)
except Exception as e:
print(f"\n\nProgram error: {e}")
import traceback
traceback.print_exc()
Finally:
Try:
connections.disconnect(alias=“default”)
print(“\n✓ Disconnected from Milvus”)
Except:
pass

第 7 步 运行和测试

(1) 设置环境变量

export GOOGLE_API_KEY="your-gemini-api-key"  
python milvus_agent.py  

预期输出

输出结果显示了内存系统在实际使用中的运行情况。

在第一次对话中,用户询问如何处理 Milvus 连接超时。Agents 给出了一个解决方案。在用户确认问题已解决后,Agent 将问题和答案保存到内存中。

在第二次对话中,一个新的会话开始了。用户用不同的词语提出了同样的问题:"Milvus 无法连接"。Agents 会自动从内存中检索类似的案例,并给出相同的解决方案。

无需人工操作。Agents 决定何时检索过去的案例,何时存储新案例,这显示了它的三大能力:跨会话记忆、语义匹配和用户隔离。

结论

ADK 在框架层面使用会话服务(SessionService)和内存服务(MemoryService)分离了短期上下文和长期记忆。Milvus通过基于向量的检索来处理语义搜索和用户级过滤。

选择框架时,目标很重要。如果需要强大的状态隔离、可审计性和生产稳定性,ADK 会更适合。如果您正在进行原型设计或实验,LangChain(一种流行的 Python 框架,用于快速构建基于 LLM 的应用程序和 Agents)则能提供更多灵活性。

对于 Agents 内存来说,关键的一环是数据库。无论使用哪种框架,语义记忆都依赖于向量数据库。Milvus 运行良好,因为它开源、本地运行、支持在单台机器上处理数十亿向量,并支持混合向量、标量和全文检索。这些功能涵盖了早期测试和生产使用。

希望本文能帮助您更好地理解代理内存设计,并为您的项目选择合适的工具。

如果您正在构建需要真正内存的人工智能 Agents,而不仅仅是更大的上下文窗口,我们很想听听您是如何处理的。

如果您对 ADK、Agents 内存设计或使用 Milvus 作为内存后端有任何疑问?加入我们的Slack 频道,或预约 20 分钟的Milvus Office Hours会议,讨论您的使用案例。

    Try Managed Milvus for Free

    Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

    Get Started

    Like the article? Spread the word

    扩展阅读