🚀 완전 관리형 Milvus인 Zilliz Cloud를 무료로 체험해보세요—10배 더 빠른 성능을 경험하세요! 지금 체험하기>>

milvus-logo
LFAI
홈페이지
  • 튜토리얼

튜토리얼: asyncio와 함께 AsyncMilvusClient 사용하기

AsyncMilvusClient는 asyncio를 통해 Milvus에 대한 비차단 액세스를 위한 코루틴 기반 API를 제공하는 비동기식 MilvusClient입니다. 이 문서에서는 AsyncMilvusClient가 제공하는 API를 호출하는 과정과 주의해야 할 측면에 대해 설명합니다.

개요

Asyncio는 async/await 구문을 사용하여 동시 코드를 작성하기 위한 라이브러리로, Milvus의 고성능 비동기 클라이언트의 기반이 되는 라이브러리이며, asyncio 위에서 실행되는 코드 라이브러리에 적합할 것입니다.

AsyncMilvusClient가 제공하는 메서드는 MilvusClient와 동일한 매개변수 집합과 동작을 가지고 있습니다. 유일한 차이점은 호출 방식에 있습니다. 다음 표에는 AsyncMilvusClient에서 사용할 수 있는 메서드가 나열되어 있습니다.

클라이언트

close()

수집 및 파티션

create_collection()

drop_collection()

create_partition()

drop_partition()

색인

create_index()

drop_index()

load_collection()

release_collection()

load_partitions()

release_partitions()

벡터

insert()

upsert()

delete()

search()

query()

hybrid_search()

get()

다른 MilvusClient 메서드의 비동기 버전이 여전히 필요한 경우 pymilvus 리포지토리에 기능 요청을 제출할 수 있습니다. 코드 기여도 환영합니다.

이벤트 루프 만들기

asyncio를 사용하는 애플리케이션은 일반적으로 이벤트 루프를 비동기 작업과 입출력 작업을 관리하는 오케스트레이터로 사용합니다. 이 튜토리얼에서는 asyncio에서 이벤트 루프를 가져와서 오케스트레이터로 사용하겠습니다.

import asyncio
import numpy as np
from scipy.sparse import csr_matrix
from pymilvus import MilvusClient, AsyncMilvusClient, DataType, RRFRanker, AnnSearchRequest

loop = asyncio.get_event_loop()

AsyncMilvusClient로 연결하기

다음 예제는 Milvus를 비동기 방식으로 연결하는 방법을 보여줍니다.

# Connect to Milvus server using AsyncMilvusClient
async_client = AsyncMilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

스키마 만들기

현재 AsyncMilvusClient에서는 create_schema() 을 사용할 수 없습니다. 대신 MilvusClient를 사용하여 컬렉션의 스키마를 생성하겠습니다.

schema = async_client.create_schema(
    auto_id=False,
    description="This is a sample schema",
)

schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("dense_vector", DataType.FLOAT_VECTOR, dim=5)
schema.add_field("sparse_vector", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("text", DataType.VARCHAR, max_length=512)

AsyncMilvusClient는 create_schema() 메서드를 동기적으로 호출하므로 이벤트 루프를 사용하여 호출을 오케스트레이션할 필요가 없습니다.

컬렉션 만들기

이제 스키마를 사용하여 컬렉션을 생성하겠습니다. AsyncMilvusClient 메서드 호출에 await 키워드를 접두사로 붙이고 다음과 같이 async 함수 안에 호출을 배치해야 한다는 점에 유의하세요.

async def create_my_collection(collection_name, schema):
    if (client.has_collection(collection_name)):
        await async_client.drop_collection(collection_name)

    await async_client.create_collection(
        collection_name=collection_name,
        schema=schema
    )

    if (client.has_collection(collection_name)):
        print("Collection created successfully")
    else:
        print("Failed to create collection")
        
# Call the above function asynchronously 
loop.run_until_complete(create_my_collection("my_collection", schema))

# Output
#
# Collection created successfully

인덱스 만들기

또한 모든 벡터 필드와 선택적 스칼라 필드에 대한 인덱스를 만들어야 합니다. 위에 정의된 스키마에 따르면 컬렉션에는 두 개의 벡터 필드가 있으며, 이에 대한 인덱스를 다음과 같이 생성합니다.

async def create_indexes(collection_name):
    index_params = client.prepare_index_params()

    index_params.add_index(field_name="dense_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="sparse_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="text", index_type="AUTOINDEX")

    await async_client.create_index(collection_name, index_params)

# Call the above function asynchronously 
loop.run_until_complete(create_indexes("my_collection"))

컬렉션 로드

필요한 필드가 색인된 후에 컬렉션을 로드할 수 있습니다. 다음 코드는 컬렉션을 비동기적으로 로드하는 방법을 보여줍니다.

async def load_my_collection(collection_name):
    await async_client.load_collection(collection_name)
    print(client.get_load_state(collection_name))
    
# Call the above function asynchronously 
loop.run_until_complete(load_my_collection("my_collection"))

# Output
#
# {'state': <LoadState: Loaded>}

데이터 삽입

pymilvus에서 제공되는 임베딩 모델을 사용하여 텍스트에 대한 벡터 임베딩을 생성할 수 있습니다. 자세한 내용은 임베딩 개요를 참조하세요. 이 섹션에서는 무작위로 생성된 데이터를 컬렉션에 삽입해 보겠습니다.

async def insert_sample_data(collection_name):
    # Randomly generated data will be used here
    rng = np.random.default_rng(42)

    def generate_random_text(length):
        seed = "this is a seed paragraph to generate random text, which is used for testing purposes. Specifically, a random text is generated by randomly selecting words from this sentence."
        words = seed.split()
        return " ".join(rng.choice(words, length))
    
    data = [{
        'id': i, 
        'dense_vector': rng.random(5).tolist(), 
        'sparse_vector': csr_matrix(rng.random(5)), 
        'text': generate_random_text(10)
    } for i in range(10000)]

    res = await async_client.insert(collection_name, data)

    print(res)

# Call the above function asynchronously 
loop.run_until_complete(insert_sample_data("my_collection"))

# Output
#
# {'insert_count': 10000, 'ids': [0, 1, 2, 3, ..., 9999]}

쿼리

컬렉션이 로드되고 데이터로 채워지면 컬렉션에서 검색 및 쿼리를 수행할 수 있습니다. 이 섹션에서는 my_collection 이라는 컬렉션에서 random 이라는 단어로 시작하는 text 필드에 있는 엔티티의 수를 찾겠습니다.

async def query_my_collection(collection_name):
    # Find the number of entities with the `text` fields starting with the word "random" in the `my_collection` collection.

    res = await async_client.query(
        collection_name="my_collection",
        filter='text like "%random%"',
        output_fields=["count(*)"]
    )

    print(res) 
    
# Call the above function asynchronously   
loop.run_until_complete(query_my_collection("my_collection"))

# Output
#
# data: ["{'count(*)': 6802}"] 

이 섹션에서는 대상 컬렉션의 고밀도 및 희소 벡터 필드에 대해 벡터 검색을 수행합니다.

async def conduct_vector_search(collection_name, type, field):
    # Generate a set of three random query vectors
    query_vectors = []
    if type == "dense":
        query_vectors = [ rng.random(5) for _ in range(3) ]
    
    if type == "sparse":
        query_vectors = [ csr_matrix(rng.random(5)) for _ in range(3) ]

    print(query_vectors)

    res = await async_client.search(
        collection_name="my_collection",
        data=query_vectors,
        anns_field=field,
        output_fields=["text", field]
    )

    print(res)
    
# To search against the dense vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "dense", "dense_vector"))

# To search against the sparse vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "sparse", "sparse_vector"))

검색 결과에는 지정된 쿼리 벡터에 해당하는 세 가지 결과 세트가 나열되어야 합니다.

하이브리드 검색은 여러 검색 결과를 결합하고 순위를 재조정하여 더 나은 검색 결과를 얻습니다. 이 섹션에서는 고밀도 및 희소 벡터 필드를 사용하여 하이브리드 검색을 수행합니다.

async def conduct_hybrid_search(collection_name):
    req_dense = AnnSearchRequest(
        data=[ rng.random(5) for _ in range(3) ],
        anns_field="dense_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    req_sparse = AnnSearchRequest(
        data=[ csr_matrix(rng.random(5)) for _ in range(3) ],
        anns_field="sparse_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    reqs = [req_dense, req_sparse]

    ranker = RRFRanker()

    res = await async_client.hybrid_search(
        collection_name="my_collection",
        reqs=reqs,
        ranker=ranker,
        output_fields=["text", "dense_vector", "sparse_vector"]
    )

    print(res)
    
# Call the above function asynchronously  
loop.run_until_complete(conduct_hybrid_search("my_collection"))

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
피드백

이 페이지가 도움이 되었나요?