Milvus
Zilliz
  • Home
  • Blog
  • 如何使用 Google ADK 和 Milvus 建立具備長期記憶體的生產就緒 AI 代理

如何使用 Google ADK 和 Milvus 建立具備長期記憶體的生產就緒 AI 代理

  • Tutorials
February 26, 2026
Min Yin

在建立智慧型代理程式時,最困難的問題之一就是記憶體管理:決定代理程式應該記住什麼、忘記什麼。

並不是所有的記憶體都可以持續使用。有些資料僅在目前的會話中需要,當會話結束時就應該清除。其他資料,例如使用者偏好,則必須在不同的會話中持續存在。當這些資料混雜在一起時,臨時資料就會堆積如山,而重要的資訊就會遺失。

真正的問題在於架構。大多數架構都沒有強制執行短期記憶體與長期記憶體之間的明確分隔,讓開發人員必須手動處理。

Google 於 2025 年推出的開放原始碼Agent Development Kit (ADK),將記憶體管理列為頭等級問題,從架構層級解決這個問題。它強制執行短期會話記憶體與長期記憶體之間的預設分離。

在這篇文章中,我們將探討這種分離在實際中是如何運作的。使用 Milvus 作為向量資料庫,我們將從頭開始建立一個具備真正長期記憶體的生產就緒代理程式。

ADK 的核心設計原則

ADK 的設計目的是讓開發人員不再需要管理記憶體。此框架會自動將短期會話資料與長期記憶體分開,並適當地處理每一種記憶體。它透過四個核心設計選擇來達到這個目的。

短期和長期記憶體的內建介面

每個 ADK 代理都有兩個管理記憶體的內建介面:

SessionService (臨時資料)

  • 儲存內容:目前的對話內容和工具呼叫的中間結果
  • 何時清除:會話結束時自動清除
  • 儲存位置:記憶體 (最快)、資料庫或雲端服務

MemoryService (長期記憶體)

  • 儲存內容:應記住的資訊,例如使用者偏好或過去的記錄
  • 何時清除:不會自動清除;必須手動刪除
  • 儲存位置:ADK 只定義了介面;儲存後端則由您決定 (例如 Milvus)

三層架構

ADK 將系統分成三層,每層都有單一的責任:

  • Agent 層:業務邏輯所在,例如「在回覆使用者之前擷取相關記憶體」。
  • Runtime 層:由架構管理,負責建立及銷毀會話,並追蹤執行的每個步驟。
  • 服務層:與外部系統整合,例如 Milvus 之類的向量資料庫或大型模型 API。

此結構將關注點分開:業務邏輯存在於代理程式中,而儲存則存在於其他地方。您可以更新其中一個而不破壞另一個。

一切都以事件形式記錄

代理程式的每個動作 - 呼叫記憶體回憶工具、調用模型、產生回應 - 都會被記錄為事件

這有兩個實際的好處。首先,當出現問題時,開發人員可以逐步重播整個互動過程,找出確切的故障點。其次,為了審計和合規性,系統提供每個使用者互動的完整執行追蹤。

基於前綴的資料範圍

ADK 使用簡單的關鍵前綴來控制資料的可見性:

  • temp:xxx- 僅在當前會話中可見,並在會話結束時自動移除
  • user:xxx- 在同一使用者的所有會話中共用,使使用者偏好設定持續存在
  • app:xxx- 在所有使用者之間全局共用,適用於應用程式範圍內的知識,例如產品文件

透過使用前綴,開發人員可以控制資料範圍,而無需撰寫額外的存取邏輯。框架會自動處理可見性和生命週期。

Milvus 作為 ADK 的記憶體後端

在 ADK 中,MemoryService 只是一個介面。它定義了長期記憶體的使用方式,但沒有定義記憶體的儲存方式。資料庫的選擇取決於開發者。那麼,哪種資料庫可以很好地當作代理的記憶體後端呢?

代理程式記憶體系統需要什麼 - 以及 Milvus 如何提供

  • 語義檢索

需求

使用者很少會以相同的方式提出相同的問題。「無法連線 」和 「連線超時 」的意思是一樣的。記憶體系統必須理解其意義,而不僅僅是匹配關鍵字。

Milvus 如何滿足它

Milvus 支援許多向量索引類型,例如 HNSW 和 DiskANN,讓開發人員可以選擇適合他們工作負載的類型。即使有數以千萬計的向量,查詢延遲仍可維持在 10 毫秒以下,這對於代理使用來說已經夠快了。

  • 混合查詢

需求

記憶體召回需要的不只是語意搜尋。系統還必須透過 user_id 等結構化欄位進行篩選,以便僅傳回當前使用者的資料。

Milvus 如何滿足它

Milvus 本機支援混合查詢,結合向量搜尋與標量篩選。例如,它可以擷取語意相似的記錄,同時在同一查詢中套用 user_id = 'xxx' 等篩選條件,而不會損害效能或召回品質。

  • 擴充性

需求

隨著使用者和儲存記憶體數量的增加,系統必須平順地擴充。效能應隨著資料的增加而保持穩定,不會突然變慢或發生故障。

Milvus 如何滿足需求:

Milvus 採用運算與儲存分離的架構。查詢容量可根據需要透過增加查詢節點進行水平擴展。即使是在單一機器上執行的獨立版本,也能處理數以千萬計的向量,因此適用於早期階段的部署。

注意:對於本機開發和測試,本文的範例使用Milvus LiteMilvus Standalone

使用 Milvus 提供的 Long-TermMemory 建立一個代理程式

在本節中,我們將建立一個簡單的技術支援代理。當使用者提出問題時,代理程式會尋找過去類似的支援票單進行回答,而不會重複相同的工作。

這個範例很有用,因為它顯示了真正的代理程式記憶體系統必須處理的三個常見問題。

  • 跨會話的長期記憶

今天提出的問題可能與幾週前建立的票單有關。代理必須跨會話記憶資訊,而不只是在目前的會話中。這就是需要透過 MemoryService 管理長期記憶體的原因。

  • 使用者隔離

每個使用者的支援歷史必須保持隱私。一個使用者的資料絕對不能出現在另一個使用者的結果中。這需要對 user_id 等欄位進行過濾,Milvus 可透過混合查詢支援此功能。

  • 語意匹配

使用者會以不同的方式描述相同的問題,例如「無法連線」或「超時」。僅僅關鍵字匹配是不夠的。代理需要語意搜尋,而向量檢索可提供語意搜尋。

環境設定

  • Python 3.11+
  • Docker 和 Docker Compose
  • Gemini API 金鑰

本節涵蓋基本設定,以確保程式能正常執行。

pip install google-adk pymilvus google-generativeai  
"""  
ADK + Milvus + Gemini Long-term Memory Agent  
Demonstrates how to implement a cross-session memory recall system  
"""  
import os  
import asyncio  
import time  
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility  
import google.generativeai as genai  
from google.adk.agents import Agent  
from google.adk.tools import FunctionTool  
from google.adk.runners import Runner  
from google.adk.sessions import InMemorySessionService  
from google.genai import types  

步驟 1:部署 Milvus 單機 (Docker)

(1) 下載部署檔案

wget <https://github.com/Milvus-io/Milvus/releases/download/v2.5.12/Milvus-standalone-docker-compose.yml> -O docker-compose.yml  

(2) 啟動 Milvus 服務

docker-compose up -d  
docker-compose ps -a  

步驟 2 模型與連線設定

配置 Gemini API 和 Milvus 連線設定。

# ==================== Configuration ====================  
# 1. Gemini API configuration  
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")  
if not GOOGLE_API_KEY:  
   raise ValueError("Please set the GOOGLE_API_KEY environment variable")  
genai.configure(api_key=GOOGLE_API_KEY)  
# 2. Milvus connection configuration  
MILVUS_HOST = os.getenv("MILVUS_HOST", "localhost")  
MILVUS_PORT = os.getenv("MILVUS_PORT", "19530")  
# 3. Model selection (best combination within the free tier limits)  
LLM_MODEL = "gemini-2.5-flash-lite"  # LLM model: 1000 RPD  
EMBEDDING_MODEL = "models/text-embedding-004"  # Embedding model: 1000 RPD  
EMBEDDING_DIM = 768  # Vector dimension  
# 4. Application configuration  
APP_NAME = "tech_support"  
USER_ID = "user_123"  
print(f"✓ Using model configuration:")  
print(f"  LLM: {LLM_MODEL}")  
print(f"  Embedding: {EMBEDDING_MODEL} (dimension: {EMBEDDING_DIM})")  

第 3 步 Milvus 資料庫初始化

建立向量資料庫集合(類似關係資料庫中的表格)

# ==================== Initialize Milvus ====================  
def init_milvus():  
   """Initialize Milvus connection and collection"""  
   # Step 1: Establish connection  
   Try:  
       connections.connect(  
           alias="default",  
           host=MILVUS_HOST,  
           port=MILVUS_PORT  
       )  
       print(f"✓ Connected to Milvus: {MILVUS_HOST}:{MILVUS_PORT}")  
   except Exception as e:  
       print(f"✗ Failed to connect to Milvus: {e}")  
       print("Hint: make sure Milvus is running")  
       Raise  
   # Step 2: Define data schema  
   fields = [  
       FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),  
       FieldSchema(name="user_id", dtype=DataType.VARCHAR, max_length=100),  
       FieldSchema(name="session_id", dtype=DataType.VARCHAR, max_length=100),  
       FieldSchema(name="question", dtype=DataType.VARCHAR, max_length=2000),  
       FieldSchema(name="solution", dtype=DataType.VARCHAR, max_length=5000),  
       FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=EMBEDDING_DIM),  
       FieldSchema(name="timestamp", dtype=DataType.INT64)  
   ]  
   schema = CollectionSchema(fields, description="Tech support memory")  
   collection_name = "support_memory"  
   # Step 3: Create or load the collection  
   if utility.has_collection(collection_name):  
       memory_collection = Collection(name=collection_name)  
       print(f"✓ Collection '{collection_name}' already exists")  
   Else:  
       memory_collection = Collection(name=collection_name, schema=schema)  
   # Step 4: Create vector index  
   index_params = {  
       "index_type": "IVF_FLAT",  
       "metric_type": "COSINE",  
       "params": {"nlist": 128}  
   }  
   memory_collection.create_index(field_name="embedding", index_params=index_params)  
   print(f"✓ Created collection '{collection_name}' and index")  
   return memory_collection  
# Run initialization  
memory_collection = init_milvus()  

步驟 4 記憶體操作功能

將儲存和檢索邏輯封裝成代理的工具。

(1) 儲存記憶體功能

# ==================== Memory Operation Functions ====================  
def store_memory(question: str, solution: str) -> str:  
   """  
   Store a solution record into the memory store  
   Args:  
       question: the user's question  
       solution: the solution  
   Returns:  
       str: result message  
   """  
   Try:  
       print(f"\\n[Tool Call] store_memory")  
       print(f" - question: {question[:50]}...")  
       print(f" - solution: {solution[:50]}...")  
       # Use global USER_ID (in production, this should come from ToolContext)  
       user_id = USER_ID  
       session_id = f"session_{int(time.time())}"  
       # Key step 1: convert the question into a 768-dimensional vector  
       embedding_result = genai.embed_content(  
           model=EMBEDDING_MODEL,  
           content=question,  
           task_type="retrieval_document",  # specify document indexing task  
           output_dimensionality=EMBEDDING_DIM  
       )  
       embedding = embedding_result["embedding"]  
       # Key step 2: insert into Milvus  
       memory_collection.insert([{  
           "user_id": user_id,  
           "session_id": session_id,  
           "question": question,  
           "solution": solution,  
           "embedding": embedding,  
           "timestamp": int(time.time())  
       }])  
       # Key step 3: flush to disk (ensure data persistence)  
       memory_collection.flush()  
       result = "✓ Successfully stored in memory"  
       print(f"[Tool Result] {result}")  
       return result  
   except Exception as e:  
       error_msg = f"✗ Storage failed: {str(e)}"  
       print(f"[Tool Error] {error_msg}")  
       return error_msg  

(2) 擷取記憶體功能

def recall_memory(query: str, top_k: int = 3) -> str:  
   """  
   Retrieve relevant historical cases from the memory store  
   Args:  
       query: query question  
       top_k: number of most similar results to return  
   Returns:  
       str: retrieval result  
   """  
   Try:  
       print(f"\\n[Tool Call] recall_memory")  
       print(f" - query: {query}")  
       print(f" - top_k: {top_k}")  
       user_id = USER_ID  
       # Key step 1: convert the query into a vector  
       embedding_result = genai.embed_content(  
           model=EMBEDDING_MODEL,  
           content=query,  
           task_type="retrieval_query",  # specify query task (different from indexing)  
           output_dimensionality=EMBEDDING_DIM  
       )  
       query_embedding = embedding_result["embedding"]  
       # Key step 2: load the collection into memory (required for the first query)  
       memory_collection.load()  
       # Key step 3: hybrid search (vector similarity + scalar filtering)  
       results = memory_collection.search(  
           data=[query_embedding],  
           anns_field="embedding",  
           param={"metric_type": "COSINE", "params": {"nprobe": 10}},  
           limit=top_k,  
           expr=f'user_id == "{user_id}"',  # 🔑 key to user isolation  
           output_fields=["question", "solution", "timestamp"]  
       )  
       # Key step 4: format results  
       if not results[0]:  
           result = "No relevant historical cases found"  
           print(f"[Tool Result] {result}")  
           return result  
       result_text = f"Found {len(results[0])} relevant cases:\\n\\n"  
       for i, hit in enumerate(results[0]):  
           result_text += f"Case {i+1} (similarity: {hit.score:.2f}):\\n"  
           result_text += f"Question: {hit.entity.get('question')}\\n"  
           result_text += f"Solution: {hit.entity.get('solution')}\\n\\n"  
       print(f"[Tool Result] Found {len(results[0])} cases")  
       return result_text  
   except Exception as e:  
       error_msg = f"Retrieval failed: {str(e)}"  
       print(f"[Tool Error] {error_msg}")  
       return error_msg  

(3) 註冊為 ADK 工具

# Usage  
# Wrap functions with FunctionTool  
store_memory_tool = FunctionTool(func=store_memory)  
recall_memory_tool = FunctionTool(func=recall_memory)  
memory_tools = [store_memory_tool, recall_memory_tool]  

步驟 5 代理程式定義

核心思想:定義代理程式的行為邏輯。

# ==================== Create Agent ====================  
support_agent = Agent(  
   model=LLM_MODEL,  
   name="support_agent",  
   description="Technical support expert agent that can remember and recall historical cases",  
   # Key: the instruction defines the agent’s behavior  
   instruction="""  
You are a technical support expert. Strictly follow the process below:  
<b>When the user asks a technical question:</b>  
1. Immediately call the recall_memory tool to search for historical cases  
  - Parameter query: use the user’s question text directly  
  - Do not ask for any additional information; call the tool directly  
2. Answer based on the retrieval result:  
  - If relevant cases are found: explain that similar historical cases were found and answer by referencing their solutions  
  - If no cases are found: explain that this is a new issue and answer based on your own knowledge  
3. After answering, ask: “Did this solution resolve your issue?”  
<b>When the user confirms the issue is resolved:</b>  
- Immediately call the store_memory tool to save this Q&A  
- Parameter question: the user’s original question  
- Parameter solution: the complete solution you provided  
<b>Important rules:</b>  
- You must call a tool before answering  
- Do not ask for user_id or any other parameters  
- Only store memory when you see confirmation phrases such as “resolved”, “it works”, or “thanks”  
""",  
   tools=memory_tools  
)  

步驟 6 主程式與執行流程

示範跨會話記憶體擷取的完整流程。

# ==================== Main Program ====================  
async def main():  
   """Demonstrate cross-session memory recall"""  
   # Create Session service and Runner  
   session_service = InMemorySessionService()  
   runner = Runner(  
       agent=support_agent,  
       app_name=APP_NAME,  
       session_service=session_service  
   )  
   # ========== First round: build memory ==========  
   print("\\n" + "=" \* 60)  
   print("First conversation: user asks a question and the solution is stored")  
   print("=" \* 60)  
   session1 = await session_service.create_session(  
       app_name=APP_NAME,  
       user_id=USER_ID,  
       session_id="session_001"  
   )  
   # User asks the first question  
   print("\\n[User]: What should I do if Milvus connection times out?")  
   content1 = types.Content(  
       role='user',  
       parts=[types.Part(text="What should I do if Milvus connection times out?")]  
   )  
   async for event in runner.run_async(  
       user_id=USER_ID,  
       session_id=[session1.id](http://session1.id),  
       new_message=content1  
   ):  
       if event.content and event.content.parts:  
           for part in event.content.parts:  
               if hasattr(part, 'text') and part.text:  
                   print(f"[Agent]: {part.text}")  
   # User confirms the issue is resolved  
   print("\\n[User]: The issue is resolved, thanks!")  
   content2 = types.Content(  
       role='user',  
       parts=[types.Part(text="The issue is resolved, thanks!")]  
   )  
   async for event in runner.run_async(  
       user_id=USER_ID,  
       session_id=[session1.id](http://session1.id),  
       new_message=content2  
   ):  
       if event.content and event.content.parts:  
           for part in event.content.parts:  
               if hasattr(part, 'text') and part.text:  
                   print(f"[Agent]: {part.text}")  
   # ========== Second round: recall memory ==========  
   print("\\n" + "=" \* 60)  
   print("Second conversation: new session with memory recall")  
   print("=" \* 60)  
   session2 = await session_service.create_session(  
       app_name=APP_NAME,  
       user_id=USER_ID,  
       session_id="session_002"  
   )  
   # User asks a similar question in a new session  
   print("\\n[User]: Milvus can't connect")  
   content3 = types.Content(  
       role='user',  
       parts=[types.Part(text="Milvus can't connect")]  
   )  
   async for event in runner.run_async(  
       user_id=USER_ID,  
       session_id=[session2.id](http://session2.id),  
       new_message=content3  
   ):  
       if event.content and event.content.parts:  
           for part in event.content.parts:  
               if hasattr(part, 'text') and part.text:  
                   print(f"[Agent]: {part.text}")

# Program entry point if name == "main":
Try:
asyncio.run(main())
except KeyboardInterrupt:
print(“\n\nProgram exited”)
except Exception as e:
print(f"\n\nProgram error: {e}")
import traceback
traceback.print_exc()
Finally:
Try:
connections.disconnect(alias=“default”)
print(“\n✓ Disconnected from Milvus”)
Except:
pass

步驟 7 執行與測試

(1) 設定環境變數

export GOOGLE_API_KEY="your-gemini-api-key"  
python milvus_agent.py  

預期輸出

輸出顯示記憶體系統在實際使用中的運作情況。

在第一次對話中,使用者詢問如何處理 Milvus 連線超時。代理提供了一個解決方案。在用戶確認問題解決後,代理將這個問題和答案保存到記憶體中。

在第二次對話中,一個新的會話開始了。用戶用不同的詞語提出相同的問題:「Milvus 無法連線」。代理程式會自動從記憶體中擷取相似的案例,並提供相同的解決方案。

無需手動步驟。代理程式會決定何時檢索過去的案例,何時儲存新的案例,這顯示了三種關鍵能力:跨會話記憶、語義匹配和使用者隔離。

結論

ADK 在架構層級使用 SessionService 和 MemoryService 分離短期上下文和長期記憶。Milvus 則透過向量式檢索處理語意搜尋與使用者層級過濾。

選擇框架時,目標很重要。如果您需要強大的狀態隔離、可稽核性和生產穩定性,ADK 會比較適合。如果您是在做原型或實驗,LangChain(一個很受歡迎的 Python 框架,用來快速建構基於 LLM 的應用程式和代理程式)則能提供更多的彈性。

對於代理記憶體而言,最關鍵的部分就是資料庫。無論您使用哪個框架,語意記憶體都有賴於向量資料庫。Milvus 運作良好,因為它是開放原始碼、可在本機執行、支援在單一機器上處理數十億個向量,並且支援混合向量、標量和全文搜尋。這些功能涵蓋早期測試和生產使用。

我們希望這篇文章能幫助您更好地瞭解代理程式記憶體設計,並為您的專案選擇合適的工具。

如果您正在建置需要真正記憶體的 AI 代理,而不只是更大的上下文視窗,我們很樂意聽取您的做法。

對於 ADK、代理程式記憶體設計或使用 Milvus 作為記憶體後端有任何疑問?加入我們的Slack 頻道,或預約 20 分鐘的Milvus Office Hours 課程,討論您的使用個案。

    Try Managed Milvus for Free

    Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

    Get Started

    Like the article? Spread the word

    繼續閱讀