Google ADK와 Milvus를 사용하여 장기 기억을 갖춘 프로덕션 지원 AI 에이전트를 구축하는 방법
지능형 에이전트를 구축할 때 가장 어려운 문제 중 하나는 에이전트가 기억해야 할 것과 잊어버려야 할 것을 결정하는 메모리 관리입니다.
모든 메모리가 지속되는 것은 아닙니다. 일부 데이터는 현재 대화에만 필요하며 대화가 끝나면 지워져야 합니다. 사용자 기본 설정과 같은 다른 데이터는 대화 전반에 걸쳐 유지되어야 합니다. 이러한 데이터가 혼합되면 임시 데이터가 쌓이고 중요한 정보가 손실됩니다.
진짜 문제는 아키텍처입니다. 대부분의 프레임워크는 단기 메모리와 장기 메모리를 명확하게 구분하지 않기 때문에 개발자가 수동으로 처리해야 합니다.
2025년에 출시된 Google의 오픈 소스 ADK(에이전트 개발 키트)는 메모리 관리를 최우선 과제로 삼아 프레임워크 수준에서 이 문제를 해결합니다. 이 키트는 단기 세션 메모리와 장기 메모리를 기본적으로 분리하도록 강제합니다.
이 글에서는 이러한 분리가 실제로 어떻게 작동하는지 살펴보겠습니다. Milvus를 벡터 데이터베이스로 사용하여 실제 장기 메모리를 갖춘 프로덕션 지원 에이전트를 처음부터 처음부터 구축해 보겠습니다.
ADK의 핵심 설계 원칙
ADK는 개발자의 메모리 관리 부담을 덜어주기 위해 설계되었습니다. 프레임워크는 단기 세션 데이터와 장기 메모리를 자동으로 분리하고 각각을 적절하게 처리합니다. 이는 네 가지 핵심 설계를 통해 이루어집니다.
단기 및 장기 메모리를 위한 내장 인터페이스
모든 ADK 에이전트에는 메모리 관리를 위한 두 가지 기본 제공 인터페이스가 있습니다:
세션 서비스(임시 데이터)
- 저장 내용: 현재 대화 콘텐츠 및 도구 호출의 중간 결과
- 지워지는 시기: 세션이 종료되면 자동으로 지워짐
- 저장 위치: 메모리(가장 빠른), 데이터베이스 또는 클라우드 서비스
MemoryService(장기 메모리)
- 저장 내용: 사용자 기본 설정이나 과거 기록 등 기억해야 하는 정보
- 삭제시기: 자동으로 삭제되지 않으며 수동으로 삭제해야 합니다.
- 저장 위치: ADK는 인터페이스만 정의하며, 스토리지 백엔드는 사용자에게 달려 있습니다(예: Milvus).
3계층 아키텍처
ADK는 시스템을 세 개의 레이어로 분할하여 각각 하나의 책임을 맡깁니다:
- 에이전트 계층: "사용자에게 응답하기 전에 관련 메모리 검색"과 같은 비즈니스 로직이 있는 곳입니다.
- 런타임 계층: 프레임워크에서 관리하며 세션 생성 및 삭제, 각 실행 단계 추적을 담당합니다.
- 서비스 계층: Milvus와 같은 벡터 데이터베이스나 대규모 모델 API와 같은 외부 시스템과 통합됩니다.
이 구조는 비즈니스 로직은 에이전트에, 스토리지는 다른 곳에 보관하는 방식으로 문제를 분리합니다. 한 쪽을 업데이트해도 다른 쪽을 손상시키지 않고 업데이트할 수 있습니다.
모든 것이 이벤트로 기록됨
에이전트가 수행하는 모든 작업(메모리 리콜 도구 호출, 모델 호출, 응답 생성 등)은 이벤트로 기록됩니다.
여기에는 두 가지 실질적인 이점이 있습니다. 첫째, 문제가 발생하면 개발자는 전체 상호작용을 단계별로 재생하여 정확한 실패 지점을 찾을 수 있습니다. 둘째, 감사 및 규정 준수를 위해 시스템은 각 사용자 상호 작용의 완전한 실행 추적을 제공합니다.
접두사 기반 데이터 범위 지정
ADK는 간단한 키 접두사를 사용하여 데이터 가시성을 제어합니다:
- temp:xxx - 현재 세션 내에서만 표시되며 세션이 종료되면 자동으로 제거됩니다.
- user:xxx - 동일한 사용자의 모든 세션에서 공유되어 영구적인 사용자 환경 설정이 가능합니다.
- app:xxx - 모든 사용자에게 전역적으로 공유되며, 제품 문서와 같은 애플리케이션 전반의 지식에 적합합니다.
접두사를 사용하면 개발자는 별도의 액세스 로직을 작성하지 않고도 데이터 범위를 제어할 수 있습니다. 프레임워크는 가시성과 수명을 자동으로 처리합니다.
ADK의 메모리 백엔드로서의 Milvus
ADK에서 MemoryService는 인터페이스에 불과합니다. 장기 메모리가 사용되는 방식은 정의하지만 저장되는 방식은 정의하지 않습니다. 데이터베이스를 선택하는 것은 개발자의 몫입니다. 그렇다면 어떤 종류의 데이터베이스가 에이전트의 메모리 백엔드로 잘 작동할까요?
에이전트 메모리 시스템에 필요한 것 - 그리고 Milvus가 제공하는 방법
- 시맨틱 검색
필요성:
사용자는 같은 질문을 같은 방식으로 하는 경우가 거의 없습니다. "연결이 안 돼요"와 "연결 시간 초과"는 같은 의미입니다. 메모리 시스템은 단순히 키워드를 일치시키는 것이 아니라 의미를 이해해야 합니다.
Milvus가 이를 충족하는 방법:
Milvus는 개발자가 워크로드에 맞는 것을 선택할 수 있도록 HNSW 및 DiskANN과 같은 다양한 벡터 인덱스 유형을 지원합니다. 수천만 개의 벡터를 사용하더라도 쿼리 대기 시간은 에이전트가 사용하기에 충분히 빠른 10ms 미만을 유지할 수 있습니다.
- 하이브리드 쿼리
필요성:
메모리 리콜에는 시맨틱 검색 이상의 기능이 필요합니다. 또한 시스템은 현재 사용자의 데이터만 반환되도록 user_id와 같은 구조화된 필드를 기준으로 필터링해야 합니다.
Milvus가 이를 충족하는 방법:
Milvus는 기본적으로 벡터 검색과 스칼라 필터링을 결합한 하이브리드 쿼리를 지원합니다. 예를 들어, 동일한 쿼리에서 user_id = 'xxx'와 같은 필터를 적용하면서 의미적으로 유사한 레코드를 검색할 수 있으며, 성능이나 리콜 품질에 영향을 미치지 않습니다.
- 확장성
필요성:
사용자 수와 저장된 메모리가 증가함에 따라 시스템은 원활하게 확장되어야 합니다. 데이터가 증가해도 갑작스러운 속도 저하나 장애 없이 성능이 안정적으로 유지되어야 합니다.
Milvus가 이를 충족하는 방법:
Milvus는 컴퓨팅-스토리지 분리 아키텍처를 사용합니다. 필요에 따라 쿼리 노드를 추가하여 쿼리 용량을 수평적으로 확장할 수 있습니다. 단일 머신에서 실행되는 독립형 버전도 수천만 개의 벡터를 처리할 수 있으므로 초기 배포에 적합합니다.
참고: 로컬 개발 및 테스트의 경우, 이 문서의 예제에서는 Milvus Lite 또는 Milvus Standalone을 사용합니다.
Milvus 기반 롱텀메모리로 에이전트 구축하기
이 섹션에서는 간단한 기술 지원 에이전트를 구축합니다. 사용자가 질문을 하면 상담원이 동일한 작업을 반복하지 않고 유사한 과거의 지원 티켓을 찾아서 답변합니다.
이 예는 실제 상담원 메모리 시스템이 처리해야 하는 세 가지 일반적인 문제를 보여주기 때문에 유용합니다.
- 세션 전반의 장기 기억
오늘 받은 질문이 몇 주 전에 만든 티켓과 관련되어 있을 수 있습니다. 상담원은 현재 세션 내에서뿐만 아니라 대화 전반에 걸친 정보를 기억해야 합니다. 그렇기 때문에 MemoryService를 통해 관리되는 장기 메모리가 필요합니다.
- 사용자 격리
각 사용자의 지원 기록은 비공개로 유지되어야 합니다. 한 사용자의 데이터가 다른 사용자의 결과에 나타나지 않아야 합니다. 이를 위해서는 user_id와 같은 필드에 대한 필터링이 필요하며, Milvus는 하이브리드 쿼리를 통해 이를 지원합니다.
- 시맨틱 매칭
사용자는 "연결할 수 없음" 또는 "시간 초과"와 같이 동일한 문제를 서로 다른 방식으로 설명합니다. 키워드 매칭만으로는 충분하지 않습니다. 에이전트는 벡터 검색을 통해 제공되는 시맨틱 검색이 필요합니다.
환경 설정
- Python 3.11 이상
- 도커 및 도커 컴포즈
- Gemini API 키
이 섹션에서는 프로그램이 올바르게 실행될 수 있도록 하기 위한 기본 설정에 대해 설명합니다.
pip install google-adk pymilvus google-generativeai
"""
ADK + Milvus + Gemini Long-term Memory Agent
Demonstrates how to implement a cross-session memory recall system
"""
import os
import asyncio
import time
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
import google.generativeai as genai
from google.adk.agents import Agent
from google.adk.tools import FunctionTool
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
1단계: Milvus 독립형 배포(Docker)
(1) 배포 파일 다운로드
wget <https://github.com/Milvus-io/Milvus/releases/download/v2.5.12/Milvus-standalone-docker-compose.yml> -O docker-compose.yml
(2) Milvus 서비스 시작
docker-compose up -d
docker-compose ps -a
2단계 모델 및 연결 구성
Gemini API 및 Milvus 연결 설정을 구성합니다.
# ==================== Configuration ====================
# 1. Gemini API configuration
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
raise ValueError("Please set the GOOGLE_API_KEY environment variable")
genai.configure(api_key=GOOGLE_API_KEY)
# 2. Milvus connection configuration
MILVUS_HOST = os.getenv("MILVUS_HOST", "localhost")
MILVUS_PORT = os.getenv("MILVUS_PORT", "19530")
# 3. Model selection (best combination within the free tier limits)
LLM_MODEL = "gemini-2.5-flash-lite" # LLM model: 1000 RPD
EMBEDDING_MODEL = "models/text-embedding-004" # Embedding model: 1000 RPD
EMBEDDING_DIM = 768 # Vector dimension
# 4. Application configuration
APP_NAME = "tech_support"
USER_ID = "user_123"
print(f"✓ Using model configuration:")
print(f" LLM: {LLM_MODEL}")
print(f" Embedding: {EMBEDDING_MODEL} (dimension: {EMBEDDING_DIM})")
3단계 Milvus 데이터베이스 초기화
벡터 데이터베이스 컬렉션을 생성합니다(관계형 데이터베이스의 테이블과 유사).
# ==================== Initialize Milvus ====================
def init_milvus():
"""Initialize Milvus connection and collection"""
# Step 1: Establish connection
Try:
connections.connect(
alias="default",
host=MILVUS_HOST,
port=MILVUS_PORT
)
print(f"✓ Connected to Milvus: {MILVUS_HOST}:{MILVUS_PORT}")
except Exception as e:
print(f"✗ Failed to connect to Milvus: {e}")
print("Hint: make sure Milvus is running")
Raise
# Step 2: Define data schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="user_id", dtype=DataType.VARCHAR, max_length=100),
FieldSchema(name="session_id", dtype=DataType.VARCHAR, max_length=100),
FieldSchema(name="question", dtype=DataType.VARCHAR, max_length=2000),
FieldSchema(name="solution", dtype=DataType.VARCHAR, max_length=5000),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=EMBEDDING_DIM),
FieldSchema(name="timestamp", dtype=DataType.INT64)
]
schema = CollectionSchema(fields, description="Tech support memory")
collection_name = "support_memory"
# Step 3: Create or load the collection
if utility.has_collection(collection_name):
memory_collection = Collection(name=collection_name)
print(f"✓ Collection '{collection_name}' already exists")
Else:
memory_collection = Collection(name=collection_name, schema=schema)
# Step 4: Create vector index
index_params = {
"index_type": "IVF_FLAT",
"metric_type": "COSINE",
"params": {"nlist": 128}
}
memory_collection.create_index(field_name="embedding", index_params=index_params)
print(f"✓ Created collection '{collection_name}' and index")
return memory_collection
# Run initialization
memory_collection = init_milvus()
4단계 메모리 연산 기능
에이전트를 위한 도구로 저장 및 검색 로직을 캡슐화합니다.
(1) 메모리 저장 함수
# ==================== Memory Operation Functions ====================
def store_memory(question: str, solution: str) -> str:
"""
Store a solution record into the memory store
Args:
question: the user's question
solution: the solution
Returns:
str: result message
"""
Try:
print(f"\\n[Tool Call] store_memory")
print(f" - question: {question[:50]}...")
print(f" - solution: {solution[:50]}...")
# Use global USER_ID (in production, this should come from ToolContext)
user_id = USER_ID
session_id = f"session_{int(time.time())}"
# Key step 1: convert the question into a 768-dimensional vector
embedding_result = genai.embed_content(
model=EMBEDDING_MODEL,
content=question,
task_type="retrieval_document", # specify document indexing task
output_dimensionality=EMBEDDING_DIM
)
embedding = embedding_result["embedding"]
# Key step 2: insert into Milvus
memory_collection.insert([{
"user_id": user_id,
"session_id": session_id,
"question": question,
"solution": solution,
"embedding": embedding,
"timestamp": int(time.time())
}])
# Key step 3: flush to disk (ensure data persistence)
memory_collection.flush()
result = "✓ Successfully stored in memory"
print(f"[Tool Result] {result}")
return result
except Exception as e:
error_msg = f"✗ Storage failed: {str(e)}"
print(f"[Tool Error] {error_msg}")
return error_msg
(2) 검색 메모리 함수
def recall_memory(query: str, top_k: int = 3) -> str:
"""
Retrieve relevant historical cases from the memory store
Args:
query: query question
top_k: number of most similar results to return
Returns:
str: retrieval result
"""
Try:
print(f"\\n[Tool Call] recall_memory")
print(f" - query: {query}")
print(f" - top_k: {top_k}")
user_id = USER_ID
# Key step 1: convert the query into a vector
embedding_result = genai.embed_content(
model=EMBEDDING_MODEL,
content=query,
task_type="retrieval_query", # specify query task (different from indexing)
output_dimensionality=EMBEDDING_DIM
)
query_embedding = embedding_result["embedding"]
# Key step 2: load the collection into memory (required for the first query)
memory_collection.load()
# Key step 3: hybrid search (vector similarity + scalar filtering)
results = memory_collection.search(
data=[query_embedding],
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"nprobe": 10}},
limit=top_k,
expr=f'user_id == "{user_id}"', # 🔑 key to user isolation
output_fields=["question", "solution", "timestamp"]
)
# Key step 4: format results
if not results[0]:
result = "No relevant historical cases found"
print(f"[Tool Result] {result}")
return result
result_text = f"Found {len(results[0])} relevant cases:\\n\\n"
for i, hit in enumerate(results[0]):
result_text += f"Case {i+1} (similarity: {hit.score:.2f}):\\n"
result_text += f"Question: {hit.entity.get('question')}\\n"
result_text += f"Solution: {hit.entity.get('solution')}\\n\\n"
print(f"[Tool Result] Found {len(results[0])} cases")
return result_text
except Exception as e:
error_msg = f"Retrieval failed: {str(e)}"
print(f"[Tool Error] {error_msg}")
return error_msg
(3) ADK 도구로 등록
# Usage
# Wrap functions with FunctionTool
store_memory_tool = FunctionTool(func=store_memory)
recall_memory_tool = FunctionTool(func=recall_memory)
memory_tools = [store_memory_tool, recall_memory_tool]
5단계 에이전트 정의
핵심 아이디어: 에이전트의 동작 로직을 정의합니다.
# ==================== Create Agent ====================
support_agent = Agent(
model=LLM_MODEL,
name="support_agent",
description="Technical support expert agent that can remember and recall historical cases",
# Key: the instruction defines the agent’s behavior
instruction="""
You are a technical support expert. Strictly follow the process below:
<b>When the user asks a technical question:</b>
1. Immediately call the recall_memory tool to search for historical cases
- Parameter query: use the user’s question text directly
- Do not ask for any additional information; call the tool directly
2. Answer based on the retrieval result:
- If relevant cases are found: explain that similar historical cases were found and answer by referencing their solutions
- If no cases are found: explain that this is a new issue and answer based on your own knowledge
3. After answering, ask: “Did this solution resolve your issue?”
<b>When the user confirms the issue is resolved:</b>
- Immediately call the store_memory tool to save this Q&A
- Parameter question: the user’s original question
- Parameter solution: the complete solution you provided
<b>Important rules:</b>
- You must call a tool before answering
- Do not ask for user_id or any other parameters
- Only store memory when you see confirmation phrases such as “resolved”, “it works”, or “thanks”
""",
tools=memory_tools
)
6단계 주요 프로그램 및 실행 흐름
세션 간 메모리 검색의 전체 프로세스를 시연합니다.
# ==================== Main Program ====================
async def main():
"""Demonstrate cross-session memory recall"""
# Create Session service and Runner
session_service = InMemorySessionService()
runner = Runner(
agent=support_agent,
app_name=APP_NAME,
session_service=session_service
)
# ========== First round: build memory ==========
print("\\n" + "=" \* 60)
print("First conversation: user asks a question and the solution is stored")
print("=" \* 60)
session1 = await session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID,
session_id="session_001"
)
# User asks the first question
print("\\n[User]: What should I do if Milvus connection times out?")
content1 = types.Content(
role='user',
parts=[types.Part(text="What should I do if Milvus connection times out?")]
)
async for event in runner.run_async(
user_id=USER_ID,
session_id=[session1.id](http://session1.id),
new_message=content1
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
print(f"[Agent]: {part.text}")
# User confirms the issue is resolved
print("\\n[User]: The issue is resolved, thanks!")
content2 = types.Content(
role='user',
parts=[types.Part(text="The issue is resolved, thanks!")]
)
async for event in runner.run_async(
user_id=USER_ID,
session_id=[session1.id](http://session1.id),
new_message=content2
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
print(f"[Agent]: {part.text}")
# ========== Second round: recall memory ==========
print("\\n" + "=" \* 60)
print("Second conversation: new session with memory recall")
print("=" \* 60)
session2 = await session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID,
session_id="session_002"
)
# User asks a similar question in a new session
print("\\n[User]: Milvus can't connect")
content3 = types.Content(
role='user',
parts=[types.Part(text="Milvus can't connect")]
)
async for event in runner.run_async(
user_id=USER_ID,
session_id=[session2.id](http://session2.id),
new_message=content3
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
print(f"[Agent]: {part.text}")
# Program entry point
if name == "main":
Try:
asyncio.run(main())
except KeyboardInterrupt:
print(“\n\nProgram exited”)
except Exception as e:
print(f"\n\nProgram error: {e}")
import traceback
traceback.print_exc()
Finally:
Try:
connections.disconnect(alias=“default”)
print(“\n✓ Disconnected from Milvus”)
Except:
pass
7단계 실행 및 테스트
(1) 환경 변수 설정
export GOOGLE_API_KEY="your-gemini-api-key"
python milvus_agent.py
예상 출력
출력은 메모리 시스템이 실제 사용 시 어떻게 작동하는지 보여줍니다.
첫 번째 대화에서 사용자는 Milvus 연결 시간 초과를 처리하는 방법을 묻습니다. 상담원이 해결책을 제시합니다. 사용자가 문제가 해결되었음을 확인한 후 상담원은 이 질문과 답변을 메모리에 저장합니다.
두 번째 대화에서는 새 세션이 시작됩니다. 사용자가 다른 단어를 사용하여 동일한 질문을 합니다: "밀버스가 연결할 수 없어요." 상담원이 메모리에서 유사한 사례를 자동으로 검색하여 동일한 솔루션을 제공합니다.
수동 단계가 필요하지 않습니다. 상담원은 세션 간 메모리, 시맨틱 매칭, 사용자 격리라는 세 가지 주요 기능을 통해 과거 사례를 검색할 때와 새 사례를 저장할 때를 결정합니다.
결론
ADK는 프레임워크 수준에서 세션서비스와 메모리서비스를 사용하여 단기 컨텍스트와 장기 메모리를 분리합니다. Milvus는 벡터 기반 검색을 통해 시맨틱 검색과 사용자 수준 필터링을 처리합니다.
프레임워크를 선택할 때는 목표가 중요합니다. 강력한 상태 격리, 감사 가능성 및 프로덕션 안정성이 필요하다면 ADK가 더 적합합니다. 프로토타이핑이나 실험을 하는 경우, LLM 기반 애플리케이션과 에이전트를 빠르게 구축하는 데 널리 사용되는 Python 프레임워크인 LangChain이 더 많은 유연성을 제공합니다.
에이전트 메모리의 경우 핵심은 데이터베이스입니다. 시맨틱 메모리는 어떤 프레임워크를 사용하든 벡터 데이터베이스에 의존합니다. Milvus는 오픈 소스이고 로컬에서 실행되며 단일 머신에서 수십억 개의 벡터 처리를 지원하고 하이브리드 벡터, 스칼라 및 전체 텍스트 검색을 지원하기 때문에 잘 작동합니다. 이러한 기능은 초기 테스트와 프로덕션 사용 모두에 적용됩니다.
이 글이 에이전트 메모리 설계를 더 잘 이해하고 프로젝트에 적합한 도구를 선택하는 데 도움이 되기를 바랍니다.
더 큰 컨텍스트 창이 아닌 실제 메모리가 필요한 AI 에이전트를 구축하는 경우 어떻게 접근하고 계신지 궁금합니다.
ADK, 에이전트 메모리 설계 또는 Milvus를 메모리 백엔드로 사용하는 것에 대해 질문이 있으신가요? Slack 채널에 참여하거나 20분 동안 진행되는 Milvus 오피스 아워 세션을 예약하여 사용 사례에 대해 이야기해 보세요.
Try Managed Milvus for Free
Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.
Get StartedLike the article? Spread the word



