Milvus
Zilliz
  • Home
  • Blog
  • Google ADKとmilvusを使った長期記憶機能を持つAIエージェントの作り方

Google ADKとmilvusを使った長期記憶機能を持つAIエージェントの作り方

  • Tutorials
February 26, 2026
Min Yin

インテリジェント・エージェントを構築する際、最も難しい問題のひとつがメモリー管理である。

すべてのメモリが長持ちするわけではありません。あるデータは現在の会話にのみ必要であり、会話が終了したら消去されるべきです。他のデータは、ユーザー設定のように、会話をまたいで持続しなければなりません。これらが混在すると、一時的なデータが積み重なり、重要な情報が失われてしまう。

本当の問題はアーキテクチャにある。ほとんどのフレームワークでは、短期メモリと長期メモリの明確な分離が強制されておらず、開発者はそれを手作業で処理しなければならない。

グーグルが2025年にリリースしたオープンソースのAgent Development Kit(ADK)は、メモリ管理を第一級の関心事にすることで、フレームワークレベルでこれに取り組んでいる。短期的なセッション・メモリと長期的なメモリをデフォルトで分離している。

この記事では、この分離が実際にどのように機能するかを見ていこう。Milvusをベクタデータベースとして使用し、実際の長期メモリを持つ量産可能なエージェントをゼロから構築します。

ADKのコア設計原則

ADKは、開発者の負担を軽減するために設計されています。フレームワークは、短期的なセッションデータと長期的なメモリを自動的に分離し、それぞれを適切に処理します。これは、4つのコアとなる設計の選択によって実現されます。

短期および長期メモリ用の組み込みインタフェース

すべての ADK エージェントには、メモリを管理するための 2 つの組み込みインタフェースが付属しています:

SessionService (一時データ)

  • 保存するもの: 現在の会話内容とツール呼び出しからの中間結果
  • クリアされるタイミング: セッション終了時に自動的にクリアされる
  • 保存場所:メモリ(最速)、データベース、クラウドサービス

MemoryService(長期記憶)

  • 保存されるもの:ユーザー設定や過去の記録など、記憶されるべき情報
  • 消去時期: 自動的には消去されない。手動で削除する必要がある。
  • どこに保存されるか:ADKが定義するのはインターフェイスのみで、ストレージのバックエンドはユーザー次第(例えばmilvus)。

3層のアーキテクチャ

ADKはシステムを3つのレイヤーに分け、それぞれに責任を持たせる:

  • エージェント層:"ユーザーに応答する前に関連メモリを検索する "などのビジネスロジックが存在する。
  • ランタイム・レイヤー:フレームワークによって管理され、セッションの作成と破棄、実行の各ステップの追跡を担当。
  • サービスレイヤー:Milvusのようなベクターデータベースや大規模なモデルAPIのような外部システムと統合します。

この構造により、ビジネスロジックはエージェント内に、ストレージは別の場所に格納されます。ビジネスロジックはエージェントにあり、ストレージは別の場所にあります。

すべてがイベントとして記録される

エージェントが行うすべてのアクション(記憶呼び出しツールの呼び出し、モデルの起動、レスポンスの生成)は、イベントとして記録されます。

これには2つの実用的な利点があります。第一に、何か問題が発生したとき、開発者はインタラクション全体をステップごとに再生し、正確な障害ポイントを見つけることができます。第二に、監査とコンプライアンスのために、システムは各ユーザー・インタラクションの完全な実行トレースを提供する。

プレフィックス・ベースのデータ・スコープ

ADKは、単純なキー接頭辞を使用してデータの可視性を制御します:

  • temp:xxx- 現在のセッション内でのみ表示され、セッションが終了すると自動的に削除されます。
  • user:xxx- 同一ユーザーの全セッションで共有され、永続的なユーザー設定が可能。
  • app:xxx- 全ユーザでグローバルに共有され、製品ドキュメントのようなアプリケーション全体のナレッジに適しています。

プレフィックスを使用することで、開発者は余分なアクセスロジックを記述することなくデータスコープを制御できます。フレームワークは可視性とライフタイムを自動的に処理します。

ADKのメモリバックエンドとしてのMilvus

ADKでは、MemoryServiceは単なるインターフェースです。長期メモリがどのように使用されるかは定義されているが、どのように保存されるかは定義されていない。データベースの選択は開発者次第だ。では、どのようなデータベースがエージェントのメモリバックエンドとしてうまく機能するのでしょうか?

エージェントの記憶システムに必要なもの-そしてmilvusの実現方法

  • 意味検索

必要性

ユーザが同じ質問を同じようにすることは稀です。「接続できない "と "接続タイムアウト "は同じ意味です。記憶システムはキーワードにマッチするだけでなく、意味を理解しなければならない。

Milvusはそれをどのように満たすか

MilvusはHNSWやDiskANNなど多くのベクトルインデックスをサポートしており、開発者はワークロードに合ったものを選択することができます。数千万のベクトルでも、クエリのレイテンシは10ミリ秒以下であり、エージェントの使用には十分な速度である。

  • ハイブリッドクエリー

必要性

記憶を呼び起こすには、セマンティック検索以上のものが必要です。システムは、user_idのような構造化フィールドでフィルタリングし、現在のユーザーのデータのみを返す必要がある。

Milvusの対応方法

Milvusはベクトル検索とスカラーフィルタリングを組み合わせたハイブリッドクエリをネイティブでサポートしている。例えば、同じクエリ内でuser_id = 'xxx'のようなフィルタを適用しながら、パフォーマンスや想起品質を損なうことなく、意味的に類似したレコードを検索することができます。

  • スケーラビリティ

必要性

ユーザー数や保存メモリが増加するにつれて、システムはスムーズにスケールしなければならない。突然の速度低下や障害が発生することなく、データが増加しても安定したパフォーマンスを維持する必要があります。

Milvusはどのようにそれを満たすか

Milvusは計算とストレージを分離したアーキテクチャを採用している。必要に応じてクエリーノードを追加することで、クエリー容量を水平方向に拡張することができる。1台のマシンで動作するスタンドアロン版でさえ、数千万のベクターを処理できるため、初期段階の導入に適している。

注:本記事の例では、ローカルでの開発およびテストのために、Milvus LiteまたはMilvus Standaloneを使用しています。

Long-TermMemoryを搭載したMilvusエージェントの構築

このセクションでは、簡単なテクニカルサポートエージェントを構築します。ユーザが質問をすると、エージェントは同じ作業を繰り返すのではなく、類似した過去のサポートチケットを検索して回答します。

この例は、実際のエージェントメモリシステムが扱わなければならない3つの一般的な問題を示しているので有用です。

  • セッションをまたいだ長期記憶

今日の質問は、数週間前に作成されたチケットに関連しているかもしれません。エージェントは、現在のセッション内だけでなく、会話をまたいで情報を記憶しなければなりません。これが、MemoryServiceを通して管理される長期メモリが必要とされる理由です。

  • ユーザの分離

各ユーザのサポート履歴は非公開でなければなりません。あるユーザのデータが他のユーザの結果に表示されることはありません。そのため、user_idのようなフィールドでフィルタリングを行う必要がありますが、Milvusはハイブリッドクエリでこれをサポートしています。

  • セマンティックマッチング

ユーザーは同じ問題を "接続できない "や "タイムアウト "など異なる方法で表現します。キーワードマッチングだけでは十分ではありません。エージェントには、ベクトル検索によって提供されるセマンティック検索が必要です。

環境セットアップ

  • Python 3.11+
  • DockerとDocker Compose
  • Gemini APIキー

このセクションでは、プログラムが正しく実行できることを確認するための基本的なセットアップについて説明する。

pip install google-adk pymilvus google-generativeai  
"""  
ADK + Milvus + Gemini Long-term Memory Agent  
Demonstrates how to implement a cross-session memory recall system  
"""  
import os  
import asyncio  
import time  
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility  
import google.generativeai as genai  
from google.adk.agents import Agent  
from google.adk.tools import FunctionTool  
from google.adk.runners import Runner  
from google.adk.sessions import InMemorySessionService  
from google.genai import types  

ステップ1: Milvus Standaloneのデプロイ(Docker)

(1) 配備用ファイルをダウンロードする。

wget <https://github.com/Milvus-io/Milvus/releases/download/v2.5.12/Milvus-standalone-docker-compose.yml> -O docker-compose.yml  

(2) Milvusサービスを起動する。

docker-compose up -d  
docker-compose ps -a  

ステップ2 モデルと接続の設定

Gemini APIとMilvusの接続設定を行う。

# ==================== Configuration ====================  
# 1. Gemini API configuration  
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")  
if not GOOGLE_API_KEY:  
   raise ValueError("Please set the GOOGLE_API_KEY environment variable")  
genai.configure(api_key=GOOGLE_API_KEY)  
# 2. Milvus connection configuration  
MILVUS_HOST = os.getenv("MILVUS_HOST", "localhost")  
MILVUS_PORT = os.getenv("MILVUS_PORT", "19530")  
# 3. Model selection (best combination within the free tier limits)  
LLM_MODEL = "gemini-2.5-flash-lite"  # LLM model: 1000 RPD  
EMBEDDING_MODEL = "models/text-embedding-004"  # Embedding model: 1000 RPD  
EMBEDDING_DIM = 768  # Vector dimension  
# 4. Application configuration  
APP_NAME = "tech_support"  
USER_ID = "user_123"  
print(f"✓ Using model configuration:")  
print(f"  LLM: {LLM_MODEL}")  
print(f"  Embedding: {EMBEDDING_MODEL} (dimension: {EMBEDDING_DIM})")  

ステップ3 Milvusデータベースの初期化

ベクトルデータベースコレクションを作成する(リレーショナルデータベースのテーブルに似ている)

# ==================== Initialize Milvus ====================  
def init_milvus():  
   """Initialize Milvus connection and collection"""  
   # Step 1: Establish connection  
   Try:  
       connections.connect(  
           alias="default",  
           host=MILVUS_HOST,  
           port=MILVUS_PORT  
       )  
       print(f"✓ Connected to Milvus: {MILVUS_HOST}:{MILVUS_PORT}")  
   except Exception as e:  
       print(f"✗ Failed to connect to Milvus: {e}")  
       print("Hint: make sure Milvus is running")  
       Raise  
   # Step 2: Define data schema  
   fields = [  
       FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),  
       FieldSchema(name="user_id", dtype=DataType.VARCHAR, max_length=100),  
       FieldSchema(name="session_id", dtype=DataType.VARCHAR, max_length=100),  
       FieldSchema(name="question", dtype=DataType.VARCHAR, max_length=2000),  
       FieldSchema(name="solution", dtype=DataType.VARCHAR, max_length=5000),  
       FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=EMBEDDING_DIM),  
       FieldSchema(name="timestamp", dtype=DataType.INT64)  
   ]  
   schema = CollectionSchema(fields, description="Tech support memory")  
   collection_name = "support_memory"  
   # Step 3: Create or load the collection  
   if utility.has_collection(collection_name):  
       memory_collection = Collection(name=collection_name)  
       print(f"✓ Collection '{collection_name}' already exists")  
   Else:  
       memory_collection = Collection(name=collection_name, schema=schema)  
   # Step 4: Create vector index  
   index_params = {  
       "index_type": "IVF_FLAT",  
       "metric_type": "COSINE",  
       "params": {"nlist": 128}  
   }  
   memory_collection.create_index(field_name="embedding", index_params=index_params)  
   print(f"✓ Created collection '{collection_name}' and index")  
   return memory_collection  
# Run initialization  
memory_collection = init_milvus()  

ステップ4 メモリ操作関数

エージェント用のツールとして、記憶と検索のロジックをカプセル化します。

(1) ストアメモリ機能

# ==================== Memory Operation Functions ====================  
def store_memory(question: str, solution: str) -> str:  
   """  
   Store a solution record into the memory store  
   Args:  
       question: the user's question  
       solution: the solution  
   Returns:  
       str: result message  
   """  
   Try:  
       print(f"\\n[Tool Call] store_memory")  
       print(f" - question: {question[:50]}...")  
       print(f" - solution: {solution[:50]}...")  
       # Use global USER_ID (in production, this should come from ToolContext)  
       user_id = USER_ID  
       session_id = f"session_{int(time.time())}"  
       # Key step 1: convert the question into a 768-dimensional vector  
       embedding_result = genai.embed_content(  
           model=EMBEDDING_MODEL,  
           content=question,  
           task_type="retrieval_document",  # specify document indexing task  
           output_dimensionality=EMBEDDING_DIM  
       )  
       embedding = embedding_result["embedding"]  
       # Key step 2: insert into Milvus  
       memory_collection.insert([{  
           "user_id": user_id,  
           "session_id": session_id,  
           "question": question,  
           "solution": solution,  
           "embedding": embedding,  
           "timestamp": int(time.time())  
       }])  
       # Key step 3: flush to disk (ensure data persistence)  
       memory_collection.flush()  
       result = "✓ Successfully stored in memory"  
       print(f"[Tool Result] {result}")  
       return result  
   except Exception as e:  
       error_msg = f"✗ Storage failed: {str(e)}"  
       print(f"[Tool Error] {error_msg}")  
       return error_msg  

(2) メモリ検索機能

def recall_memory(query: str, top_k: int = 3) -> str:  
   """  
   Retrieve relevant historical cases from the memory store  
   Args:  
       query: query question  
       top_k: number of most similar results to return  
   Returns:  
       str: retrieval result  
   """  
   Try:  
       print(f"\\n[Tool Call] recall_memory")  
       print(f" - query: {query}")  
       print(f" - top_k: {top_k}")  
       user_id = USER_ID  
       # Key step 1: convert the query into a vector  
       embedding_result = genai.embed_content(  
           model=EMBEDDING_MODEL,  
           content=query,  
           task_type="retrieval_query",  # specify query task (different from indexing)  
           output_dimensionality=EMBEDDING_DIM  
       )  
       query_embedding = embedding_result["embedding"]  
       # Key step 2: load the collection into memory (required for the first query)  
       memory_collection.load()  
       # Key step 3: hybrid search (vector similarity + scalar filtering)  
       results = memory_collection.search(  
           data=[query_embedding],  
           anns_field="embedding",  
           param={"metric_type": "COSINE", "params": {"nprobe": 10}},  
           limit=top_k,  
           expr=f'user_id == "{user_id}"',  # 🔑 key to user isolation  
           output_fields=["question", "solution", "timestamp"]  
       )  
       # Key step 4: format results  
       if not results[0]:  
           result = "No relevant historical cases found"  
           print(f"[Tool Result] {result}")  
           return result  
       result_text = f"Found {len(results[0])} relevant cases:\\n\\n"  
       for i, hit in enumerate(results[0]):  
           result_text += f"Case {i+1} (similarity: {hit.score:.2f}):\\n"  
           result_text += f"Question: {hit.entity.get('question')}\\n"  
           result_text += f"Solution: {hit.entity.get('solution')}\\n\\n"  
       print(f"[Tool Result] Found {len(results[0])} cases")  
       return result_text  
   except Exception as e:  
       error_msg = f"Retrieval failed: {str(e)}"  
       print(f"[Tool Error] {error_msg}")  
       return error_msg  

(3) ADK ツールとして登録

# Usage  
# Wrap functions with FunctionTool  
store_memory_tool = FunctionTool(func=store_memory)  
recall_memory_tool = FunctionTool(func=recall_memory)  
memory_tools = [store_memory_tool, recall_memory_tool]  

ステップ 5 エージェント定義

コアアイデア:エージェントの動作ロジックを定義する。

# ==================== Create Agent ====================  
support_agent = Agent(  
   model=LLM_MODEL,  
   name="support_agent",  
   description="Technical support expert agent that can remember and recall historical cases",  
   # Key: the instruction defines the agent’s behavior  
   instruction="""  
You are a technical support expert. Strictly follow the process below:  
<b>When the user asks a technical question:</b>  
1. Immediately call the recall_memory tool to search for historical cases  
  - Parameter query: use the user’s question text directly  
  - Do not ask for any additional information; call the tool directly  
2. Answer based on the retrieval result:  
  - If relevant cases are found: explain that similar historical cases were found and answer by referencing their solutions  
  - If no cases are found: explain that this is a new issue and answer based on your own knowledge  
3. After answering, ask: “Did this solution resolve your issue?”  
<b>When the user confirms the issue is resolved:</b>  
- Immediately call the store_memory tool to save this Q&A  
- Parameter question: the user’s original question  
- Parameter solution: the complete solution you provided  
<b>Important rules:</b>  
- You must call a tool before answering  
- Do not ask for user_id or any other parameters  
- Only store memory when you see confirmation phrases such as “resolved”, “it works”, or “thanks”  
""",  
   tools=memory_tools  
)  

ステップ 6 メインプログラムと実行フロー

セッションをまたいだメモリ検索の完全なプロセスを示します。

# ==================== Main Program ====================  
async def main():  
   """Demonstrate cross-session memory recall"""  
   # Create Session service and Runner  
   session_service = InMemorySessionService()  
   runner = Runner(  
       agent=support_agent,  
       app_name=APP_NAME,  
       session_service=session_service  
   )  
   # ========== First round: build memory ==========  
   print("\\n" + "=" \* 60)  
   print("First conversation: user asks a question and the solution is stored")  
   print("=" \* 60)  
   session1 = await session_service.create_session(  
       app_name=APP_NAME,  
       user_id=USER_ID,  
       session_id="session_001"  
   )  
   # User asks the first question  
   print("\\n[User]: What should I do if Milvus connection times out?")  
   content1 = types.Content(  
       role='user',  
       parts=[types.Part(text="What should I do if Milvus connection times out?")]  
   )  
   async for event in runner.run_async(  
       user_id=USER_ID,  
       session_id=[session1.id](http://session1.id),  
       new_message=content1  
   ):  
       if event.content and event.content.parts:  
           for part in event.content.parts:  
               if hasattr(part, 'text') and part.text:  
                   print(f"[Agent]: {part.text}")  
   # User confirms the issue is resolved  
   print("\\n[User]: The issue is resolved, thanks!")  
   content2 = types.Content(  
       role='user',  
       parts=[types.Part(text="The issue is resolved, thanks!")]  
   )  
   async for event in runner.run_async(  
       user_id=USER_ID,  
       session_id=[session1.id](http://session1.id),  
       new_message=content2  
   ):  
       if event.content and event.content.parts:  
           for part in event.content.parts:  
               if hasattr(part, 'text') and part.text:  
                   print(f"[Agent]: {part.text}")  
   # ========== Second round: recall memory ==========  
   print("\\n" + "=" \* 60)  
   print("Second conversation: new session with memory recall")  
   print("=" \* 60)  
   session2 = await session_service.create_session(  
       app_name=APP_NAME,  
       user_id=USER_ID,  
       session_id="session_002"  
   )  
   # User asks a similar question in a new session  
   print("\\n[User]: Milvus can't connect")  
   content3 = types.Content(  
       role='user',  
       parts=[types.Part(text="Milvus can't connect")]  
   )  
   async for event in runner.run_async(  
       user_id=USER_ID,  
       session_id=[session2.id](http://session2.id),  
       new_message=content3  
   ):  
       if event.content and event.content.parts:  
           for part in event.content.parts:  
               if hasattr(part, 'text') and part.text:  
                   print(f"[Agent]: {part.text}")

# Program entry point if name == "main":
Try:
asyncio.run(main())
except KeyboardInterrupt:
print(“\n\nProgram exited”)
except Exception as e:
print(f"\n\nProgram error: {e}")
import traceback
traceback.print_exc()
Finally:
Try:
connections.disconnect(alias=“default”)
print(“\n✓ Disconnected from Milvus”)
Except:
pass

ステップ 7 実行とテスト

(1) 環境変数の設定

export GOOGLE_API_KEY="your-gemini-api-key"  
python milvus_agent.py  

期待される出力

出力はメモリシステムが実際にどのように動作するかを示している。

最初の会話では、ユーザがMilvus接続のタイムアウトをどのように扱うかを尋ねている。エージェントは解決策を示す。ユーザが問題が解決したことを確認した後、エージェントはこの質問と回答をメモリに保存します。

2番目の会話では、新しいセッションが始まります。ユーザは、異なる単語で同じ質問をします:「Milvusは接続できません。エージェントは自動的にメモリから同様のケースを検索し、同じ解決策を与える。

手作業は必要ない。エージェントは、過去のケースをいつ検索し、新しいケースをいつ保存するかを決定し、3つの重要な能力を示す:クロスセッションメモリー、セマンティックマッチング、ユーザーアイソレーション。

結論

ADKは、SessionServiceとMemoryServiceを使って、フレームワークレベルで短期コンテキストと長期記憶を分離する。milvusは、ベクトルベースの検索を通して、意味検索とユーザーレベルのフィルタリングを扱う。

フレームワークを選択する際には、ゴールが重要になる。強力な状態の分離、監査可能性、運用の安定性が必要であれば、ADKの方が適している。プロトタイピングや実験であれば、LangChain(LLMベースのアプリケーションやエージェントを素早く構築するための人気のPythonフレームワーク)の方が柔軟性があります。

エージェントのメモリで重要なのはデータベースです。セマンティックメモリは、どのフレームワークを使っても、ベクターデータベースに依存します。Milvusはオープンソースであり、ローカルで動作し、1台のマシンで数十億のベクトルを扱うことができ、ハイブリッドベクトル、スカラー、全文検索をサポートしている。これらの特徴は、初期のテストと本番使用の両方をカバーしている。

この記事が、エージェント・メモリの設計について理解を深め、プロジェクトに適したツールを選択する一助となれば幸いである。

より大きなコンテキスト・ウィンドウだけでなく、実際のメモリを必要とするAIエージェントを構築しているのであれば、どのように取り組んでいるのかぜひお聞かせください。

ADK、エージェントのメモリ設計、またはMilvusをメモリバックエンドとして使用することについて質問がありますか?Slackチャンネルにご参加いただくか、Milvusオフィスアワーの20分セッションをご予約ください。

    Try Managed Milvus for Free

    Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

    Get Started

    Like the article? Spread the word

    続けて読む