Создайте RAG с помощью Milvus и SiliconFlow
SiliconFlow стремится создать масштабируемую, стандартизированную и высокопроизводительную инфраструктурную платформу ИИ. SiliconCloud - одно из флагманских предложений SiliconFlow, описываемое как платформа "модель как услуга" (MaaS). Она предоставляет комплексную среду для развертывания различных моделей ИИ, включая большие языковые модели (LLM) и модели встраивания. SiliconCloud объединяет множество моделей с открытым исходным кодом, позволяя пользователям легко получать доступ к этим ресурсам и использовать их без необходимости создания обширной инфраструктуры.
В этом руководстве мы покажем вам, как построить конвейер RAG (Retrieval-Augmented Generation) с помощью Milvus и SiliconFlow.
Подготовка
Зависимости и окружение
$ pip install --upgrade pymilvus openai requests tqdm
Если вы используете Google Colab, то для включения только что установленных зависимостей вам может потребоваться перезапустить среду выполнения (нажмите на меню "Runtime" в верхней части экрана и выберите "Restart session" из выпадающего меню).
SiliconFlow позволяет использовать API в стиле OpenAI. Вы можете зайти на его официальный сайт и подготовить api ключ SILICON_FLOW_API_KEY
в качестве переменной окружения.
import os
os.environ["SILICON_FLOW_API_KEY"] = "***********"
Подготовьте данные
Мы используем страницы FAQ из Milvus Documentation 2.4.x в качестве приватного знания в нашем RAG, который является хорошим источником данных для простого RAG-конвейера.
Скачайте zip-файл и распакуйте документы в папку milvus_docs
.
$ wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
$ unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
Мы загружаем все файлы разметки из папки milvus_docs/en/faq
. Для каждого документа мы просто используем "# " для разделения содержимого в файле, что позволяет примерно разделить содержимое каждой основной части файла разметки.
from glob import glob
text_lines = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()
text_lines += file_text.split("# ")
Подготовка модели встраивания
Мы инициализируем клиента, чтобы подготовить модель встраивания. SiliconFlow позволяет использовать API в стиле OpenAI, и вы можете использовать этот же API с небольшими изменениями для вызова модели встраивания и LLM.
from openai import OpenAI
siliconflow_client = OpenAI(
api_key=os.environ["SILICON_FLOW_API_KEY"], base_url="https://api.siliconflow.cn/v1"
)
Определите функцию для генерации текстовых вкраплений с помощью клиента. В качестве примера мы используем модель BAAI/bge-large-en-v1.5
.
def emb_text(text):
return (
siliconflow_client.embeddings.create(input=text, model="BAAI/bge-large-en-v1.5")
.data[0]
.embedding
)
Сгенерируйте тестовое вкрапление и выведите его размерность и первые несколько элементов.
test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1024
[0.011475468054413795, 0.02982141077518463, 0.0038535362109541893, 0.035921916365623474, -0.0159175843000412, -0.014918108470737934, -0.018094222992658615, -0.002937349723652005, 0.030917132273316383, 0.03390815854072571]
Загрузка данных в Milvus
Создайте коллекцию
from pymilvus import MilvusClient
milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"
Что касается аргумента MilvusClient
:
- Установка
uri
в качестве локального файла, например./milvus.db
, является наиболее удобным методом, поскольку он автоматически использует Milvus Lite для хранения всех данных в этом файле. - Если у вас большой объем данных, вы можете настроить более производительный сервер Milvus на docker или kubernetes. В этом случае используйте ури сервера, например
http://localhost:19530
, в качествеuri
. - Если вы хотите использовать Zilliz Cloud, полностью управляемый облачный сервис для Milvus, настройте
uri
иtoken
, которые соответствуют публичной конечной точке и ключу Api в Zilliz Cloud.
Проверьте, не существует ли уже коллекция, и удалите ее, если она существует.
if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)
Создайте новую коллекцию с указанными параметрами.
Если мы не укажем информацию о полях, Milvus автоматически создаст поле по умолчанию id
для первичного ключа и поле vector
для хранения векторных данных. Зарезервированное поле JSON используется для хранения не определенных схемой полей и их значений.
milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # Inner product distance
consistency_level="Strong", # Strong consistency level
)
Вставка данных
Пройдитесь по текстовым строкам, создайте вкрапления, а затем вставьте данные в Milvus.
Вот новое поле text
, которое является неопределенным полем в схеме коллекции. Оно будет автоматически добавлено в зарезервированное динамическое поле JSON, с которым можно обращаться как с обычным полем на высоком уровне.
from tqdm import tqdm
data = []
for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": emb_text(line), "text": line})
milvus_client.insert(collection_name=collection_name, data=data)
Creating embeddings: 100%|██████████| 72/72 [00:04<00:00, 16.97it/s]
{'insert_count': 72, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], 'cost': 0}
Построение RAG
Получение данных для запроса
Давайте зададим частый вопрос о Milvus.
question = "How is data stored in milvus?"
Найдем этот вопрос в коллекции и получим семантический топ-3 совпадений.
search_res = milvus_client.search(
collection_name=collection_name,
data=[
emb_text(question)
], # Use the `emb_text` function to convert the question to an embedding vector
limit=3, # Return top 3 results
search_params={"metric_type": "IP", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
Давайте посмотрим на результаты поиска по этому запросу.
import json
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
" Where does Milvus store data?\n\nMilvus deals with two types of data, inserted data and metadata. \n\nInserted data, including vector data, scalar data, and collection-specific schema, are stored in persistent storage as incremental log. Milvus supports multiple object storage backends, including [MinIO](https://min.io/), [AWS S3](https://aws.amazon.com/s3/?nc1=h_ls), [Google Cloud Storage](https://cloud.google.com/storage?hl=en#object-storage-for-companies-of-all-sizes) (GCS), [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs), [Alibaba Cloud OSS](https://www.alibabacloud.com/product/object-storage-service), and [Tencent Cloud Object Storage](https://www.tencentcloud.com/products/cos) (COS).\n\nMetadata are generated within Milvus. Each Milvus module has its own metadata that are stored in etcd.\n\n###",
0.833885133266449
],
[
"How does Milvus flush data?\n\nMilvus returns success when inserted data are loaded to the message queue. However, the data are not yet flushed to the disk. Then Milvus' data node writes the data in the message queue to persistent storage as incremental logs. If `flush()` is called, the data node is forced to write all data in the message queue to persistent storage immediately.\n\n###",
0.812842607498169
],
[
"Does the query perform in memory? What are incremental data and historical data?\n\nYes. When a query request comes, Milvus searches both incremental data and historical data by loading them into memory. Incremental data are in the growing segments, which are buffered in memory before they reach the threshold to be persisted in storage engine, while historical data are from the sealed segments that are stored in the object storage. Incremental data and historical data together constitute the whole dataset to search.\n\n###",
0.7714196443557739
]
]
Использование LLM для получения ответа RAG
Преобразуйте полученные документы в строковый формат.
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
Определите системные и пользовательские подсказки для модели Lanage. Эта подсказка собрана с полученными документами из Milvus.
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""
Используйте модель deepseek-ai/DeepSeek-V2.5
, предоставленную SiliconCloud, чтобы сгенерировать ответ на основе подсказок.
response = siliconflow_client.chat.completions.create(
model="deepseek-ai/DeepSeek-V2.5",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT},
],
)
print(response.choices[0].message.content)
In Milvus, data is stored in two main categories: inserted data and metadata.
- **Inserted Data**: This includes vector data, scalar data, and collection-specific schema, which are stored in persistent storage as incremental logs. Milvus supports various object storage backends such as MinIO, AWS S3, Google Cloud Storage (GCS), Azure Blob Storage, Alibaba Cloud OSS, and Tencent Cloud Object Storage (COS).
- **Metadata**: This is generated within Milvus, with each module having its own metadata stored in etcd, a distributed key-value store.
Отлично! Мы успешно построили конвейер RAG с помощью Milvus и SiliconFlow.