Google ADKとmilvusを使った長期記憶機能を持つAIエージェントの作り方
インテリジェント・エージェントを構築する際、最も難しい問題のひとつがメモリー管理である。
すべてのメモリが長持ちするわけではありません。あるデータは現在の会話にのみ必要であり、会話が終了したら消去されるべきです。他のデータは、ユーザー設定のように、会話をまたいで持続しなければなりません。これらが混在すると、一時的なデータが積み重なり、重要な情報が失われてしまう。
本当の問題はアーキテクチャにある。ほとんどのフレームワークでは、短期メモリと長期メモリの明確な分離が強制されておらず、開発者はそれを手作業で処理しなければならない。
グーグルが2025年にリリースしたオープンソースのAgent Development Kit(ADK)は、メモリ管理を第一級の関心事にすることで、フレームワークレベルでこれに取り組んでいる。短期的なセッション・メモリと長期的なメモリをデフォルトで分離している。
この記事では、この分離が実際にどのように機能するかを見ていこう。Milvusをベクタデータベースとして使用し、実際の長期メモリを持つ量産可能なエージェントをゼロから構築します。
ADKのコア設計原則
ADKは、開発者の負担を軽減するために設計されています。フレームワークは、短期的なセッションデータと長期的なメモリを自動的に分離し、それぞれを適切に処理します。これは、4つのコアとなる設計の選択によって実現されます。
短期および長期メモリ用の組み込みインタフェース
すべての ADK エージェントには、メモリを管理するための 2 つの組み込みインタフェースが付属しています:
SessionService (一時データ)
- 保存するもの: 現在の会話内容とツール呼び出しからの中間結果
- クリアされるタイミング: セッション終了時に自動的にクリアされる
- 保存場所:メモリ(最速)、データベース、クラウドサービス
MemoryService(長期記憶)
- 保存されるもの:ユーザー設定や過去の記録など、記憶されるべき情報
- 消去時期: 自動的には消去されない。手動で削除する必要がある。
- どこに保存されるか:ADKが定義するのはインターフェイスのみで、ストレージのバックエンドはユーザー次第(例えばmilvus)。
3層のアーキテクチャ
ADKはシステムを3つのレイヤーに分け、それぞれに責任を持たせる:
- エージェント層:"ユーザーに応答する前に関連メモリを検索する "などのビジネスロジックが存在する。
- ランタイム・レイヤー:フレームワークによって管理され、セッションの作成と破棄、実行の各ステップの追跡を担当。
- サービスレイヤー:Milvusのようなベクターデータベースや大規模なモデルAPIのような外部システムと統合します。
この構造により、ビジネスロジックはエージェント内に、ストレージは別の場所に格納されます。ビジネスロジックはエージェントにあり、ストレージは別の場所にあります。
すべてがイベントとして記録される
エージェントが行うすべてのアクション(記憶呼び出しツールの呼び出し、モデルの起動、レスポンスの生成)は、イベントとして記録されます。
これには2つの実用的な利点があります。第一に、何か問題が発生したとき、開発者はインタラクション全体をステップごとに再生し、正確な障害ポイントを見つけることができます。第二に、監査とコンプライアンスのために、システムは各ユーザー・インタラクションの完全な実行トレースを提供する。
プレフィックス・ベースのデータ・スコープ
ADKは、単純なキー接頭辞を使用してデータの可視性を制御します:
- temp:xxx- 現在のセッション内でのみ表示され、セッションが終了すると自動的に削除されます。
- user:xxx- 同一ユーザーの全セッションで共有され、永続的なユーザー設定が可能。
- app:xxx- 全ユーザでグローバルに共有され、製品ドキュメントのようなアプリケーション全体のナレッジに適しています。
プレフィックスを使用することで、開発者は余分なアクセスロジックを記述することなくデータスコープを制御できます。フレームワークは可視性とライフタイムを自動的に処理します。
ADKのメモリバックエンドとしてのMilvus
ADKでは、MemoryServiceは単なるインターフェースです。長期メモリがどのように使用されるかは定義されているが、どのように保存されるかは定義されていない。データベースの選択は開発者次第だ。では、どのようなデータベースがエージェントのメモリバックエンドとしてうまく機能するのでしょうか?
エージェントの記憶システムに必要なもの-そしてmilvusの実現方法
- 意味検索
必要性
ユーザが同じ質問を同じようにすることは稀です。「接続できない "と "接続タイムアウト "は同じ意味です。記憶システムはキーワードにマッチするだけでなく、意味を理解しなければならない。
Milvusはそれをどのように満たすか:
MilvusはHNSWやDiskANNなど多くのベクトルインデックスをサポートしており、開発者はワークロードに合ったものを選択することができます。数千万のベクトルでも、クエリのレイテンシは10ミリ秒以下であり、エージェントの使用には十分な速度である。
- ハイブリッドクエリー
必要性
記憶を呼び起こすには、セマンティック検索以上のものが必要です。システムは、user_idのような構造化フィールドでフィルタリングし、現在のユーザーのデータのみを返す必要がある。
Milvusの対応方法:
Milvusはベクトル検索とスカラーフィルタリングを組み合わせたハイブリッドクエリをネイティブでサポートしている。例えば、同じクエリ内でuser_id = 'xxx'のようなフィルタを適用しながら、パフォーマンスや想起品質を損なうことなく、意味的に類似したレコードを検索することができます。
- スケーラビリティ
必要性:
ユーザー数や保存メモリが増加するにつれて、システムはスムーズにスケールしなければならない。突然の速度低下や障害が発生することなく、データが増加しても安定したパフォーマンスを維持する必要があります。
Milvusはどのようにそれを満たすか:
Milvusは計算とストレージを分離したアーキテクチャを採用している。必要に応じてクエリーノードを追加することで、クエリー容量を水平方向に拡張することができる。1台のマシンで動作するスタンドアロン版でさえ、数千万のベクターを処理できるため、初期段階の導入に適している。
注:本記事の例では、ローカルでの開発およびテストのために、Milvus LiteまたはMilvus Standaloneを使用しています。
Long-TermMemoryを搭載したMilvusエージェントの構築
このセクションでは、簡単なテクニカルサポートエージェントを構築します。ユーザが質問をすると、エージェントは同じ作業を繰り返すのではなく、類似した過去のサポートチケットを検索して回答します。
この例は、実際のエージェントメモリシステムが扱わなければならない3つの一般的な問題を示しているので有用です。
- セッションをまたいだ長期記憶
今日の質問は、数週間前に作成されたチケットに関連しているかもしれません。エージェントは、現在のセッション内だけでなく、会話をまたいで情報を記憶しなければなりません。これが、MemoryServiceを通して管理される長期メモリが必要とされる理由です。
- ユーザの分離
各ユーザのサポート履歴は非公開でなければなりません。あるユーザのデータが他のユーザの結果に表示されることはありません。そのため、user_idのようなフィールドでフィルタリングを行う必要がありますが、Milvusはハイブリッドクエリでこれをサポートしています。
- セマンティックマッチング
ユーザーは同じ問題を "接続できない "や "タイムアウト "など異なる方法で表現します。キーワードマッチングだけでは十分ではありません。エージェントには、ベクトル検索によって提供されるセマンティック検索が必要です。
環境セットアップ
- Python 3.11+
- DockerとDocker Compose
- Gemini APIキー
このセクションでは、プログラムが正しく実行できることを確認するための基本的なセットアップについて説明する。
pip install google-adk pymilvus google-generativeai
"""
ADK + Milvus + Gemini Long-term Memory Agent
Demonstrates how to implement a cross-session memory recall system
"""
import os
import asyncio
import time
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
import google.generativeai as genai
from google.adk.agents import Agent
from google.adk.tools import FunctionTool
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
ステップ1: Milvus Standaloneのデプロイ(Docker)
(1) 配備用ファイルをダウンロードする。
wget <https://github.com/Milvus-io/Milvus/releases/download/v2.5.12/Milvus-standalone-docker-compose.yml> -O docker-compose.yml
(2) Milvusサービスを起動する。
docker-compose up -d
docker-compose ps -a
ステップ2 モデルと接続の設定
Gemini APIとMilvusの接続設定を行う。
# ==================== Configuration ====================
# 1. Gemini API configuration
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
raise ValueError("Please set the GOOGLE_API_KEY environment variable")
genai.configure(api_key=GOOGLE_API_KEY)
# 2. Milvus connection configuration
MILVUS_HOST = os.getenv("MILVUS_HOST", "localhost")
MILVUS_PORT = os.getenv("MILVUS_PORT", "19530")
# 3. Model selection (best combination within the free tier limits)
LLM_MODEL = "gemini-2.5-flash-lite" # LLM model: 1000 RPD
EMBEDDING_MODEL = "models/text-embedding-004" # Embedding model: 1000 RPD
EMBEDDING_DIM = 768 # Vector dimension
# 4. Application configuration
APP_NAME = "tech_support"
USER_ID = "user_123"
print(f"✓ Using model configuration:")
print(f" LLM: {LLM_MODEL}")
print(f" Embedding: {EMBEDDING_MODEL} (dimension: {EMBEDDING_DIM})")
ステップ3 Milvusデータベースの初期化
ベクトルデータベースコレクションを作成する(リレーショナルデータベースのテーブルに似ている)
# ==================== Initialize Milvus ====================
def init_milvus():
"""Initialize Milvus connection and collection"""
# Step 1: Establish connection
Try:
connections.connect(
alias="default",
host=MILVUS_HOST,
port=MILVUS_PORT
)
print(f"✓ Connected to Milvus: {MILVUS_HOST}:{MILVUS_PORT}")
except Exception as e:
print(f"✗ Failed to connect to Milvus: {e}")
print("Hint: make sure Milvus is running")
Raise
# Step 2: Define data schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="user_id", dtype=DataType.VARCHAR, max_length=100),
FieldSchema(name="session_id", dtype=DataType.VARCHAR, max_length=100),
FieldSchema(name="question", dtype=DataType.VARCHAR, max_length=2000),
FieldSchema(name="solution", dtype=DataType.VARCHAR, max_length=5000),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=EMBEDDING_DIM),
FieldSchema(name="timestamp", dtype=DataType.INT64)
]
schema = CollectionSchema(fields, description="Tech support memory")
collection_name = "support_memory"
# Step 3: Create or load the collection
if utility.has_collection(collection_name):
memory_collection = Collection(name=collection_name)
print(f"✓ Collection '{collection_name}' already exists")
Else:
memory_collection = Collection(name=collection_name, schema=schema)
# Step 4: Create vector index
index_params = {
"index_type": "IVF_FLAT",
"metric_type": "COSINE",
"params": {"nlist": 128}
}
memory_collection.create_index(field_name="embedding", index_params=index_params)
print(f"✓ Created collection '{collection_name}' and index")
return memory_collection
# Run initialization
memory_collection = init_milvus()
ステップ4 メモリ操作関数
エージェント用のツールとして、記憶と検索のロジックをカプセル化します。
(1) ストアメモリ機能
# ==================== Memory Operation Functions ====================
def store_memory(question: str, solution: str) -> str:
"""
Store a solution record into the memory store
Args:
question: the user's question
solution: the solution
Returns:
str: result message
"""
Try:
print(f"\\n[Tool Call] store_memory")
print(f" - question: {question[:50]}...")
print(f" - solution: {solution[:50]}...")
# Use global USER_ID (in production, this should come from ToolContext)
user_id = USER_ID
session_id = f"session_{int(time.time())}"
# Key step 1: convert the question into a 768-dimensional vector
embedding_result = genai.embed_content(
model=EMBEDDING_MODEL,
content=question,
task_type="retrieval_document", # specify document indexing task
output_dimensionality=EMBEDDING_DIM
)
embedding = embedding_result["embedding"]
# Key step 2: insert into Milvus
memory_collection.insert([{
"user_id": user_id,
"session_id": session_id,
"question": question,
"solution": solution,
"embedding": embedding,
"timestamp": int(time.time())
}])
# Key step 3: flush to disk (ensure data persistence)
memory_collection.flush()
result = "✓ Successfully stored in memory"
print(f"[Tool Result] {result}")
return result
except Exception as e:
error_msg = f"✗ Storage failed: {str(e)}"
print(f"[Tool Error] {error_msg}")
return error_msg
(2) メモリ検索機能
def recall_memory(query: str, top_k: int = 3) -> str:
"""
Retrieve relevant historical cases from the memory store
Args:
query: query question
top_k: number of most similar results to return
Returns:
str: retrieval result
"""
Try:
print(f"\\n[Tool Call] recall_memory")
print(f" - query: {query}")
print(f" - top_k: {top_k}")
user_id = USER_ID
# Key step 1: convert the query into a vector
embedding_result = genai.embed_content(
model=EMBEDDING_MODEL,
content=query,
task_type="retrieval_query", # specify query task (different from indexing)
output_dimensionality=EMBEDDING_DIM
)
query_embedding = embedding_result["embedding"]
# Key step 2: load the collection into memory (required for the first query)
memory_collection.load()
# Key step 3: hybrid search (vector similarity + scalar filtering)
results = memory_collection.search(
data=[query_embedding],
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"nprobe": 10}},
limit=top_k,
expr=f'user_id == "{user_id}"', # 🔑 key to user isolation
output_fields=["question", "solution", "timestamp"]
)
# Key step 4: format results
if not results[0]:
result = "No relevant historical cases found"
print(f"[Tool Result] {result}")
return result
result_text = f"Found {len(results[0])} relevant cases:\\n\\n"
for i, hit in enumerate(results[0]):
result_text += f"Case {i+1} (similarity: {hit.score:.2f}):\\n"
result_text += f"Question: {hit.entity.get('question')}\\n"
result_text += f"Solution: {hit.entity.get('solution')}\\n\\n"
print(f"[Tool Result] Found {len(results[0])} cases")
return result_text
except Exception as e:
error_msg = f"Retrieval failed: {str(e)}"
print(f"[Tool Error] {error_msg}")
return error_msg
(3) ADK ツールとして登録
# Usage
# Wrap functions with FunctionTool
store_memory_tool = FunctionTool(func=store_memory)
recall_memory_tool = FunctionTool(func=recall_memory)
memory_tools = [store_memory_tool, recall_memory_tool]
ステップ 5 エージェント定義
コアアイデア:エージェントの動作ロジックを定義する。
# ==================== Create Agent ====================
support_agent = Agent(
model=LLM_MODEL,
name="support_agent",
description="Technical support expert agent that can remember and recall historical cases",
# Key: the instruction defines the agent’s behavior
instruction="""
You are a technical support expert. Strictly follow the process below:
<b>When the user asks a technical question:</b>
1. Immediately call the recall_memory tool to search for historical cases
- Parameter query: use the user’s question text directly
- Do not ask for any additional information; call the tool directly
2. Answer based on the retrieval result:
- If relevant cases are found: explain that similar historical cases were found and answer by referencing their solutions
- If no cases are found: explain that this is a new issue and answer based on your own knowledge
3. After answering, ask: “Did this solution resolve your issue?”
<b>When the user confirms the issue is resolved:</b>
- Immediately call the store_memory tool to save this Q&A
- Parameter question: the user’s original question
- Parameter solution: the complete solution you provided
<b>Important rules:</b>
- You must call a tool before answering
- Do not ask for user_id or any other parameters
- Only store memory when you see confirmation phrases such as “resolved”, “it works”, or “thanks”
""",
tools=memory_tools
)
ステップ 6 メインプログラムと実行フロー
セッションをまたいだメモリ検索の完全なプロセスを示します。
# ==================== Main Program ====================
async def main():
"""Demonstrate cross-session memory recall"""
# Create Session service and Runner
session_service = InMemorySessionService()
runner = Runner(
agent=support_agent,
app_name=APP_NAME,
session_service=session_service
)
# ========== First round: build memory ==========
print("\\n" + "=" \* 60)
print("First conversation: user asks a question and the solution is stored")
print("=" \* 60)
session1 = await session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID,
session_id="session_001"
)
# User asks the first question
print("\\n[User]: What should I do if Milvus connection times out?")
content1 = types.Content(
role='user',
parts=[types.Part(text="What should I do if Milvus connection times out?")]
)
async for event in runner.run_async(
user_id=USER_ID,
session_id=[session1.id](http://session1.id),
new_message=content1
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
print(f"[Agent]: {part.text}")
# User confirms the issue is resolved
print("\\n[User]: The issue is resolved, thanks!")
content2 = types.Content(
role='user',
parts=[types.Part(text="The issue is resolved, thanks!")]
)
async for event in runner.run_async(
user_id=USER_ID,
session_id=[session1.id](http://session1.id),
new_message=content2
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
print(f"[Agent]: {part.text}")
# ========== Second round: recall memory ==========
print("\\n" + "=" \* 60)
print("Second conversation: new session with memory recall")
print("=" \* 60)
session2 = await session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID,
session_id="session_002"
)
# User asks a similar question in a new session
print("\\n[User]: Milvus can't connect")
content3 = types.Content(
role='user',
parts=[types.Part(text="Milvus can't connect")]
)
async for event in runner.run_async(
user_id=USER_ID,
session_id=[session2.id](http://session2.id),
new_message=content3
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
print(f"[Agent]: {part.text}")
# Program entry point
if name == "main":
Try:
asyncio.run(main())
except KeyboardInterrupt:
print(“\n\nProgram exited”)
except Exception as e:
print(f"\n\nProgram error: {e}")
import traceback
traceback.print_exc()
Finally:
Try:
connections.disconnect(alias=“default”)
print(“\n✓ Disconnected from Milvus”)
Except:
pass
ステップ 7 実行とテスト
(1) 環境変数の設定
export GOOGLE_API_KEY="your-gemini-api-key"
python milvus_agent.py
期待される出力
出力はメモリシステムが実際にどのように動作するかを示している。
最初の会話では、ユーザがMilvus接続のタイムアウトをどのように扱うかを尋ねている。エージェントは解決策を示す。ユーザが問題が解決したことを確認した後、エージェントはこの質問と回答をメモリに保存します。
2番目の会話では、新しいセッションが始まります。ユーザは、異なる単語で同じ質問をします:「Milvusは接続できません。エージェントは自動的にメモリから同様のケースを検索し、同じ解決策を与える。
手作業は必要ない。エージェントは、過去のケースをいつ検索し、新しいケースをいつ保存するかを決定し、3つの重要な能力を示す:クロスセッションメモリー、セマンティックマッチング、ユーザーアイソレーション。
結論
ADKは、SessionServiceとMemoryServiceを使って、フレームワークレベルで短期コンテキストと長期記憶を分離する。milvusは、ベクトルベースの検索を通して、意味検索とユーザーレベルのフィルタリングを扱う。
フレームワークを選択する際には、ゴールが重要になる。強力な状態の分離、監査可能性、運用の安定性が必要であれば、ADKの方が適している。プロトタイピングや実験であれば、LangChain(LLMベースのアプリケーションやエージェントを素早く構築するための人気のPythonフレームワーク)の方が柔軟性があります。
エージェントのメモリで重要なのはデータベースです。セマンティックメモリは、どのフレームワークを使っても、ベクターデータベースに依存します。Milvusはオープンソースであり、ローカルで動作し、1台のマシンで数十億のベクトルを扱うことができ、ハイブリッドベクトル、スカラー、全文検索をサポートしている。これらの特徴は、初期のテストと本番使用の両方をカバーしている。
この記事が、エージェント・メモリの設計について理解を深め、プロジェクトに適したツールを選択する一助となれば幸いである。
より大きなコンテキスト・ウィンドウだけでなく、実際のメモリを必要とするAIエージェントを構築しているのであれば、どのように取り組んでいるのかぜひお聞かせください。
ADK、エージェントのメモリ設計、またはMilvusをメモリバックエンドとして使用することについて質問がありますか?Slackチャンネルにご参加いただくか、Milvusオフィスアワーの20分セッションをご予約ください。
Try Managed Milvus for Free
Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.
Get StartedLike the article? Spread the word



