Wie man mit Google ADK und Milvus produktionsreife KI-Agenten mit Langzeitspeicher erstellt
Bei der Entwicklung intelligenter Agenten ist eines der schwierigsten Probleme die Speicherverwaltung: die Entscheidung, was der Agent sich merken und was er vergessen soll.
Nicht jeder Speicher ist für die Ewigkeit gedacht. Einige Daten werden nur für das aktuelle Gespräch benötigt und sollten nach dessen Beendigung gelöscht werden. Andere Daten, wie z. B. Benutzereinstellungen, müssen über mehrere Gespräche hinweg erhalten bleiben. Wenn diese Daten vermischt werden, stapeln sich die temporären Daten, und wichtige Informationen gehen verloren.
Das eigentliche Problem liegt in der Architektur. Die meisten Frameworks sorgen nicht für eine klare Trennung zwischen Kurzzeit- und Langzeitspeicher, so dass die Entwickler dies manuell erledigen müssen.
Das Open-Source Agent Development Kit (ADK) von Google, das 2025 veröffentlicht wurde, geht dieses Problem auf der Framework-Ebene an, indem es die Speicherverwaltung zu einem Thema erster Klasse macht. Es erzwingt eine Standardtrennung zwischen Kurzzeit-Sitzungsspeicher und Langzeitspeicher.
In diesem Artikel werden wir uns ansehen, wie diese Trennung in der Praxis funktioniert. Unter Verwendung von Milvus als Vektordatenbank werden wir einen produktionsreifen Agenten mit echtem Langzeitspeicher von Grund auf aufbauen.
ADKs grundlegende Designprinzipien
ADK ist so konzipiert, dass die Speicherverwaltung dem Entwickler abgenommen wird. Das Framework trennt automatisch kurzfristige Sitzungsdaten vom Langzeitspeicher und behandelt beide entsprechend. Dies geschieht durch vier zentrale Designentscheidungen.
Eingebaute Schnittstellen für Kurz- und Langzeitspeicher
Jeder ADK-Agent verfügt über zwei eingebaute Schnittstellen für die Speicherverwaltung:
SessionService (temporäre Daten)
- Was er speichert: aktuelle Gesprächsinhalte und Zwischenergebnisse von Tool-Aufrufen
- Wann wird er gelöscht: automatisch bei Beendigung der Sitzung
- Wo wird er gespeichert: im Speicher (am schnellsten), in einer Datenbank oder in einem Cloud-Dienst
MemoryService (Langzeitspeicher)
- Was wird gespeichert: Informationen, die gespeichert werden sollen, wie z. B. Benutzereinstellungen oder frühere Datensätze
- Wann wird er gelöscht: nicht automatisch; muss manuell gelöscht werden
- Wo er gespeichert wird: ADK definiert nur die Schnittstelle; das Speicher-Backend ist Ihnen überlassen (z. B. Milvus)
Eine dreischichtige Architektur
Das ADK unterteilt das System in drei Schichten, die jeweils für eine bestimmte Aufgabe zuständig sind:
- Agentenschicht: Hier befindet sich die Geschäftslogik, z. B. "Abrufen des relevanten Speichers, bevor dem Benutzer geantwortet wird".
- Laufzeitschicht: Sie wird vom Framework verwaltet und ist für die Erstellung und den Abbau von Sitzungen und die Verfolgung jedes Ausführungsschritts verantwortlich.
- Serviceschicht: Integration mit externen Systemen, z. B. Vektordatenbanken wie Milvus oder große Modell-APIs.
Diese Struktur sorgt für eine Trennung der Bereiche: Die Geschäftslogik befindet sich im Agenten, während die Speicherung an anderer Stelle erfolgt. Sie können das eine aktualisieren, ohne das andere zu zerstören.
Alles wird als Ereignis aufgezeichnet
Jede Aktion eines Agenten - das Aufrufen eines Erinnerungsprogramms, das Aufrufen eines Modells, das Generieren einer Antwort - wird als Ereignis aufgezeichnet.
Dies hat zwei praktische Vorteile. Erstens können Entwickler, wenn etwas schief geht, die gesamte Interaktion Schritt für Schritt wiederholen, um den genauen Fehlerpunkt zu finden. Zweitens liefert das System für die Prüfung und Einhaltung von Vorschriften eine vollständige Ausführungsspur für jede Benutzerinteraktion.
Präfix-basiertes Data Scoping
ADK steuert die Sichtbarkeit von Daten über einfache Schlüsselpräfixe:
- temp:xxx - nur innerhalb der aktuellen Sitzung sichtbar und automatisch entfernt, wenn diese endet
- user:xxx - wird von allen Sitzungen desselben Benutzers gemeinsam genutzt und ermöglicht dauerhafte Benutzereinstellungen
- app:xxx - wird von allen Benutzern gemeinsam genutzt und eignet sich für anwendungsübergreifendes Wissen, wie z. B. Produktdokumentation
Durch die Verwendung von Präfixen können Entwickler den Datenumfang kontrollieren, ohne zusätzliche Zugriffslogik zu schreiben. Das Framework verwaltet Sichtbarkeit und Lebensdauer automatisch.
Milvus als Speicher-Backend für ADK
Im ADK ist der MemoryService nur eine Schnittstelle. Sie definiert, wie der Langzeitspeicher verwendet wird, aber nicht, wie er gespeichert wird. Die Wahl der Datenbank bleibt dem Entwickler überlassen. Welche Art von Datenbank eignet sich also für das Speicher-Backend eines Agenten?
Was ein Agentenspeichersystem braucht - und wie Milvus es liefert
- Semantischer Abruf
Der Bedarf:
Benutzer stellen selten dieselbe Frage auf dieselbe Weise. "Es wird keine Verbindung hergestellt" und "Zeitüberschreitung der Verbindung" bedeuten das Gleiche. Das Speichersystem muss die Bedeutung verstehen und nicht nur Schlüsselwörter abgleichen.
Wie Milvus dies erfüllt:
Milvus unterstützt viele Vektor-Index-Typen wie HNSW und DiskANN, so dass die Entwickler wählen können, was zu ihrer Arbeitslast passt. Selbst bei Dutzenden von Millionen von Vektoren kann die Abfragelatenz unter 10 ms bleiben, was für den Einsatz von Agenten schnell genug ist.
- Hybride Abfragen
Der Bedarf:
Der Speicherabruf erfordert mehr als eine semantische Suche. Das System muss auch nach strukturierten Feldern wie user_id filtern, damit nur die Daten des aktuellen Benutzers zurückgegeben werden.
Wie Milvus dies erfüllt:
Milvus unterstützt von Haus aus hybride Abfragen, die Vektorsuche mit skalarer Filterung kombinieren. So können z. B. semantisch ähnliche Datensätze abgerufen werden, während in derselben Abfrage ein Filter wie user_id = 'xxx' angewendet wird, ohne dass die Leistung oder die Abrufqualität beeinträchtigt wird.
- Skalierbarkeit
Der Bedarf:
Wenn die Zahl der Nutzer und der gespeicherten Daten wächst, muss das System reibungslos skalieren. Die Leistung sollte bei steigendem Datenvolumen stabil bleiben, ohne plötzliche Verlangsamungen oder Ausfälle.
Wie Milvus diese Anforderungen erfüllt:
Milvus verwendet eine Architektur zur Trennung von Datenverarbeitung und Speicherung. Die Abfragekapazität kann durch Hinzufügen von Abfrageknoten nach Bedarf horizontal skaliert werden. Selbst die Standalone-Version, die auf einem einzigen Rechner läuft, kann Dutzende von Millionen von Vektoren verarbeiten und eignet sich daher für den Einsatz in der Anfangsphase.
Hinweis: Für lokale Entwicklung und Tests verwenden die Beispiele in diesem Artikel Milvus Lite oder Milvus Standalone.
Aufbau eines Agenten mit Long-TermMemory Powered by Milvus
In diesem Abschnitt bauen wir einen einfachen technischen Support-Agenten. Wenn ein Benutzer eine Frage stellt, sucht der Agent nach ähnlichen früheren Support-Tickets, um sie zu beantworten, anstatt die gleiche Arbeit zu wiederholen.
Dieses Beispiel ist nützlich, weil es drei häufige Probleme zeigt, mit denen echte Agentenspeichersysteme umgehen müssen.
- Langfristiges Gedächtnis über Sitzungen hinweg
Eine heute gestellte Frage kann sich auf ein Ticket beziehen, das vor Wochen erstellt wurde. Der Agent muss sich Informationen über mehrere Konversationen hinweg merken, nicht nur innerhalb der aktuellen Sitzung. Aus diesem Grund wird ein Langzeitspeicher benötigt, der über MemoryService verwaltet wird.
- Benutzerisolierung
Der Supportverlauf eines jeden Benutzers muss privat bleiben. Daten eines Benutzers dürfen niemals in den Ergebnissen eines anderen Benutzers erscheinen. Dies erfordert die Filterung von Feldern wie user_id, die Milvus durch hybride Abfragen unterstützt.
- Semantischer Abgleich
Benutzer beschreiben dasselbe Problem auf unterschiedliche Weise, z. B. "kann keine Verbindung herstellen" oder "Zeitüberschreitung". Der Abgleich von Schlüsselwörtern ist nicht genug. Der Agent benötigt eine semantische Suche, die durch Vektor-Retrieval bereitgestellt wird.
Einrichtung der Umgebung
- Python 3.11+
- Docker und Docker Compose
- Gemini-API-Schlüssel
Dieser Abschnitt behandelt die grundlegende Einrichtung, um sicherzustellen, dass das Programm korrekt ausgeführt werden kann.
pip install google-adk pymilvus google-generativeai
"""
ADK + Milvus + Gemini Long-term Memory Agent
Demonstrates how to implement a cross-session memory recall system
"""
import os
import asyncio
import time
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
import google.generativeai as genai
from google.adk.agents import Agent
from google.adk.tools import FunctionTool
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
Schritt 1: Einsatz von Milvus Standalone (Docker)
(1) Laden Sie die Bereitstellungsdateien herunter.
wget <https://github.com/Milvus-io/Milvus/releases/download/v2.5.12/Milvus-standalone-docker-compose.yml> -O docker-compose.yml
(2) Starten Sie den Milvus-Dienst
docker-compose up -d
docker-compose ps -a
Schritt 2: Modell- und Verbindungskonfiguration
Konfigurieren Sie die Gemini-API und die Milvus-Verbindungseinstellungen.
# ==================== Configuration ====================
# 1. Gemini API configuration
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
raise ValueError("Please set the GOOGLE_API_KEY environment variable")
genai.configure(api_key=GOOGLE_API_KEY)
# 2. Milvus connection configuration
MILVUS_HOST = os.getenv("MILVUS_HOST", "localhost")
MILVUS_PORT = os.getenv("MILVUS_PORT", "19530")
# 3. Model selection (best combination within the free tier limits)
LLM_MODEL = "gemini-2.5-flash-lite" # LLM model: 1000 RPD
EMBEDDING_MODEL = "models/text-embedding-004" # Embedding model: 1000 RPD
EMBEDDING_DIM = 768 # Vector dimension
# 4. Application configuration
APP_NAME = "tech_support"
USER_ID = "user_123"
print(f"✓ Using model configuration:")
print(f" LLM: {LLM_MODEL}")
print(f" Embedding: {EMBEDDING_MODEL} (dimension: {EMBEDDING_DIM})")
Schritt 3 Initialisierung der Milvus-Datenbank
Erstellen einer Vektordatenbank-Sammlung (ähnlich einer Tabelle in einer relationalen Datenbank)
# ==================== Initialize Milvus ====================
def init_milvus():
"""Initialize Milvus connection and collection"""
# Step 1: Establish connection
Try:
connections.connect(
alias="default",
host=MILVUS_HOST,
port=MILVUS_PORT
)
print(f"✓ Connected to Milvus: {MILVUS_HOST}:{MILVUS_PORT}")
except Exception as e:
print(f"✗ Failed to connect to Milvus: {e}")
print("Hint: make sure Milvus is running")
Raise
# Step 2: Define data schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="user_id", dtype=DataType.VARCHAR, max_length=100),
FieldSchema(name="session_id", dtype=DataType.VARCHAR, max_length=100),
FieldSchema(name="question", dtype=DataType.VARCHAR, max_length=2000),
FieldSchema(name="solution", dtype=DataType.VARCHAR, max_length=5000),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=EMBEDDING_DIM),
FieldSchema(name="timestamp", dtype=DataType.INT64)
]
schema = CollectionSchema(fields, description="Tech support memory")
collection_name = "support_memory"
# Step 3: Create or load the collection
if utility.has_collection(collection_name):
memory_collection = Collection(name=collection_name)
print(f"✓ Collection '{collection_name}' already exists")
Else:
memory_collection = Collection(name=collection_name, schema=schema)
# Step 4: Create vector index
index_params = {
"index_type": "IVF_FLAT",
"metric_type": "COSINE",
"params": {"nlist": 128}
}
memory_collection.create_index(field_name="embedding", index_params=index_params)
print(f"✓ Created collection '{collection_name}' and index")
return memory_collection
# Run initialization
memory_collection = init_milvus()
Schritt 4 Funktionen für Speicheroperationen
Kapseln Sie die Speicher- und Abfragelogik als Werkzeuge für den Agenten.
(1) Speicherfunktion speichern
# ==================== Memory Operation Functions ====================
def store_memory(question: str, solution: str) -> str:
"""
Store a solution record into the memory store
Args:
question: the user's question
solution: the solution
Returns:
str: result message
"""
Try:
print(f"\\n[Tool Call] store_memory")
print(f" - question: {question[:50]}...")
print(f" - solution: {solution[:50]}...")
# Use global USER_ID (in production, this should come from ToolContext)
user_id = USER_ID
session_id = f"session_{int(time.time())}"
# Key step 1: convert the question into a 768-dimensional vector
embedding_result = genai.embed_content(
model=EMBEDDING_MODEL,
content=question,
task_type="retrieval_document", # specify document indexing task
output_dimensionality=EMBEDDING_DIM
)
embedding = embedding_result["embedding"]
# Key step 2: insert into Milvus
memory_collection.insert([{
"user_id": user_id,
"session_id": session_id,
"question": question,
"solution": solution,
"embedding": embedding,
"timestamp": int(time.time())
}])
# Key step 3: flush to disk (ensure data persistence)
memory_collection.flush()
result = "✓ Successfully stored in memory"
print(f"[Tool Result] {result}")
return result
except Exception as e:
error_msg = f"✗ Storage failed: {str(e)}"
print(f"[Tool Error] {error_msg}")
return error_msg
(2) Speicherfunktion abrufen
def recall_memory(query: str, top_k: int = 3) -> str:
"""
Retrieve relevant historical cases from the memory store
Args:
query: query question
top_k: number of most similar results to return
Returns:
str: retrieval result
"""
Try:
print(f"\\n[Tool Call] recall_memory")
print(f" - query: {query}")
print(f" - top_k: {top_k}")
user_id = USER_ID
# Key step 1: convert the query into a vector
embedding_result = genai.embed_content(
model=EMBEDDING_MODEL,
content=query,
task_type="retrieval_query", # specify query task (different from indexing)
output_dimensionality=EMBEDDING_DIM
)
query_embedding = embedding_result["embedding"]
# Key step 2: load the collection into memory (required for the first query)
memory_collection.load()
# Key step 3: hybrid search (vector similarity + scalar filtering)
results = memory_collection.search(
data=[query_embedding],
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"nprobe": 10}},
limit=top_k,
expr=f'user_id == "{user_id}"', # 🔑 key to user isolation
output_fields=["question", "solution", "timestamp"]
)
# Key step 4: format results
if not results[0]:
result = "No relevant historical cases found"
print(f"[Tool Result] {result}")
return result
result_text = f"Found {len(results[0])} relevant cases:\\n\\n"
for i, hit in enumerate(results[0]):
result_text += f"Case {i+1} (similarity: {hit.score:.2f}):\\n"
result_text += f"Question: {hit.entity.get('question')}\\n"
result_text += f"Solution: {hit.entity.get('solution')}\\n\\n"
print(f"[Tool Result] Found {len(results[0])} cases")
return result_text
except Exception as e:
error_msg = f"Retrieval failed: {str(e)}"
print(f"[Tool Error] {error_msg}")
return error_msg
(3) Registrierung als ADK-Werkzeug
# Usage
# Wrap functions with FunctionTool
store_memory_tool = FunctionTool(func=store_memory)
recall_memory_tool = FunctionTool(func=recall_memory)
memory_tools = [store_memory_tool, recall_memory_tool]
Schritt 5 Agentendefinition
Kerngedanke: Definition der Verhaltenslogik des Agenten.
# ==================== Create Agent ====================
support_agent = Agent(
model=LLM_MODEL,
name="support_agent",
description="Technical support expert agent that can remember and recall historical cases",
# Key: the instruction defines the agent’s behavior
instruction="""
You are a technical support expert. Strictly follow the process below:
<b>When the user asks a technical question:</b>
1. Immediately call the recall_memory tool to search for historical cases
- Parameter query: use the user’s question text directly
- Do not ask for any additional information; call the tool directly
2. Answer based on the retrieval result:
- If relevant cases are found: explain that similar historical cases were found and answer by referencing their solutions
- If no cases are found: explain that this is a new issue and answer based on your own knowledge
3. After answering, ask: “Did this solution resolve your issue?”
<b>When the user confirms the issue is resolved:</b>
- Immediately call the store_memory tool to save this Q&A
- Parameter question: the user’s original question
- Parameter solution: the complete solution you provided
<b>Important rules:</b>
- You must call a tool before answering
- Do not ask for user_id or any other parameters
- Only store memory when you see confirmation phrases such as “resolved”, “it works”, or “thanks”
""",
tools=memory_tools
)
Schritt 6 Hauptprogramm und Ausführungsablauf
Demonstriert den kompletten Prozess des sitzungsübergreifenden Speicherabrufs.
# ==================== Main Program ====================
async def main():
"""Demonstrate cross-session memory recall"""
# Create Session service and Runner
session_service = InMemorySessionService()
runner = Runner(
agent=support_agent,
app_name=APP_NAME,
session_service=session_service
)
# ========== First round: build memory ==========
print("\\n" + "=" \* 60)
print("First conversation: user asks a question and the solution is stored")
print("=" \* 60)
session1 = await session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID,
session_id="session_001"
)
# User asks the first question
print("\\n[User]: What should I do if Milvus connection times out?")
content1 = types.Content(
role='user',
parts=[types.Part(text="What should I do if Milvus connection times out?")]
)
async for event in runner.run_async(
user_id=USER_ID,
session_id=[session1.id](http://session1.id),
new_message=content1
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
print(f"[Agent]: {part.text}")
# User confirms the issue is resolved
print("\\n[User]: The issue is resolved, thanks!")
content2 = types.Content(
role='user',
parts=[types.Part(text="The issue is resolved, thanks!")]
)
async for event in runner.run_async(
user_id=USER_ID,
session_id=[session1.id](http://session1.id),
new_message=content2
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
print(f"[Agent]: {part.text}")
# ========== Second round: recall memory ==========
print("\\n" + "=" \* 60)
print("Second conversation: new session with memory recall")
print("=" \* 60)
session2 = await session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID,
session_id="session_002"
)
# User asks a similar question in a new session
print("\\n[User]: Milvus can't connect")
content3 = types.Content(
role='user',
parts=[types.Part(text="Milvus can't connect")]
)
async for event in runner.run_async(
user_id=USER_ID,
session_id=[session2.id](http://session2.id),
new_message=content3
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
print(f"[Agent]: {part.text}")
# Program entry point
if name == "main":
Try:
asyncio.run(main())
except KeyboardInterrupt:
print(“\n\nProgram exited”)
except Exception as e:
print(f"\n\nProgram error: {e}")
import traceback
traceback.print_exc()
Finally:
Try:
connections.disconnect(alias=“default”)
print(“\n✓ Disconnected from Milvus”)
Except:
pass
Schritt 7 Ausführen und Testen
(1) Setzen von Umgebungsvariablen
export GOOGLE_API_KEY="your-gemini-api-key"
python milvus_agent.py
Erwartete Ausgabe
Die Ausgabe zeigt, wie das Speichersystem in der Praxis funktioniert.
In der ersten Konversation fragt der Benutzer, wie er mit einer Zeitüberschreitung der Milvus-Verbindung umgehen soll. Der Agent gibt eine Lösung vor. Nachdem der Benutzer bestätigt hat, dass das Problem gelöst ist, speichert der Agent diese Frage und die Antwort im Speicher.
In der zweiten Konversation beginnt eine neue Sitzung. Der Benutzer stellt die gleiche Frage mit anderen Worten: "Milvus kann keine Verbindung herstellen." Der Agent ruft automatisch einen ähnlichen Fall aus dem Speicher ab und gibt die gleiche Lösung.
Es sind keine manuellen Schritte erforderlich. Der Agent entscheidet, wann er frühere Fälle abruft und wann er neue speichert, und zeigt damit drei Schlüsselfähigkeiten: sitzungsübergreifendes Gedächtnis, semantischer Abgleich und Benutzerisolierung.
Schlussfolgerung
ADK trennt Kurzzeitkontext und Langzeitgedächtnis auf Framework-Ebene durch SessionService und MemoryService. Milvus übernimmt die semantische Suche und die Filterung auf Benutzerebene durch vektorbasiertes Retrieval.
Bei der Wahl eines Frameworks kommt es auf das Ziel an. Wenn Sie eine starke Statusisolierung, Auditierbarkeit und Produktionsstabilität benötigen, ist ADK die bessere Wahl. Wenn Sie Prototypen erstellen oder experimentieren möchten, bietet LangChain (ein beliebtes Python-Framework zur schnellen Erstellung von LLM-basierten Anwendungen und Agenten) mehr Flexibilität.
Der Schlüssel zum Agenten-Speicher ist die Datenbank. Das semantische Gedächtnis hängt von Vektordatenbanken ab, ganz gleich, welches Framework Sie verwenden. Milvus eignet sich gut, weil es quelloffen ist, lokal läuft, die Verarbeitung von Milliarden von Vektoren auf einem einzigen Rechner unterstützt und eine hybride Vektor-, Skalar- und Volltextsuche ermöglicht. Diese Funktionen eignen sich sowohl für frühe Tests als auch für den Produktionseinsatz.
Wir hoffen, dass dieser Artikel Ihnen hilft, das Design von Agentenspeichern besser zu verstehen und die richtigen Tools für Ihre Projekte auszuwählen.
Wenn Sie KI-Agenten entwickeln, die echten Speicher - und nicht nur größere Kontextfenster - benötigen, würden wir uns freuen, wenn Sie uns mitteilen, wie Sie dabei vorgehen.
Haben Sie Fragen zu ADK, Agenten-Speicherdesign oder zur Verwendung von Milvus als Speicher-Backend? Treten Sie unserem Slack-Kanal bei oder buchen Sie eine 20-minütige Milvus-Sprechstunde, um Ihren Anwendungsfall zu besprechen.
Try Managed Milvus for Free
Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.
Get StartedLike the article? Spread the word



