Airbyte: Инфраструктура перемещения данных с открытым исходным кодом
Airbyte - это инфраструктура перемещения данных с открытым исходным кодом для создания конвейеров извлечения и загрузки данных (EL). Она разработана для обеспечения универсальности, масштабируемости и простоты использования. Каталог коннекторов Airbyte поставляется "из коробки" с более чем 350 предварительно созданными коннекторами. С помощью этих коннекторов можно запустить репликацию данных из источника в пункт назначения всего за несколько минут.
Основные компоненты Airbyte
1. Каталог коннекторов
- 350+ готовых коннекторов: Каталог коннекторов Airbyte поставляется "из коробки" с более чем 350 готовыми коннекторами. Эти коннекторы можно использовать, чтобы начать репликацию данных из источника в пункт назначения всего за несколько минут.
- No-Code Connector Builder: Вы можете легко расширить функциональность Airbyte для поддержки ваших пользовательских сценариев использования с помощью таких инструментов, как No-Code Connector Builder.
2. Платформа
Платформа Airbyte предоставляет все горизонтальные сервисы, необходимые для настройки и масштабирования операций по перемещению данных, и может быть как облачной, так и самоуправляемой.
3. Пользовательский интерфейс
Airbyte имеет пользовательский интерфейс, библиотеку PyAirbyte (Python), API и Terraform Provider для интеграции с предпочитаемыми вами инструментами и подходами к управлению инфраструктурой.
Благодаря возможностям Airbyte пользователи могут интегрировать источники данных в кластер Milvus для поиска по сходству.
Прежде чем начать
Вам понадобятся:
- учетная запись Zendesk (или другой источник данных, с которым вы хотите синхронизировать данные)
- учетная запись Airbyte или локальный экземпляр
- API-ключ OpenAI
- Кластер Milvus
- Python 3.10, установленный локально
Настройка кластера Milvus
Если вы уже развернули кластер K8s для производства, вы можете пропустить этот шаг и перейти непосредственно к развертыванию Milvus Operator. В противном случае вы можете выполнить шаги по развертыванию кластера Milvus с помощью Milvus Operator.
Отдельные сущности (в нашем случае это тикеты поддержки и статьи базы знаний) хранятся в "коллекции" - после настройки кластера вам нужно создать коллекцию. Выберите подходящее имя и установите значение Dimension равным 1536, чтобы соответствовать размерности вектора, генерируемого сервисом встраивания OpenAI.
После создания запишите информацию о конечной точке и аутентификации.
Настройка соединения в Airbyte
Наша база данных готова, давайте перенесем туда данные! Для этого нам нужно настроить соединение в Airbyte. Либо зарегистрируйте облачный аккаунт Airbyte на cloud.airbyte.com, либо запустите локальный экземпляр, как описано в документации.
Настройка источника
После того как ваш экземпляр запущен, нам нужно настроить соединение - нажмите "Новое соединение" и выберите коннектор "Zendesk Support" в качестве источника. После нажатия кнопки "Проверить и сохранить" Airbyte проверит, может ли быть установлено соединение.
В облаке Airbyte вы можете легко пройти аутентификацию, нажав кнопку Authenticate. При использовании локального экземпляра Airbyte следуйте инструкциям, описанным на странице документации.
Настройка пункта назначения
Если все работает правильно, следующим шагом будет настройка места назначения для перемещения данных. Здесь выберите коннектор "Milvus".
Коннектор Milvus выполняет три задачи:
- Разбивка и форматирование - Разбивает записи Zendesk на текст и метаданные. Если размер текста превышает заданный размер чанка, записи разбиваются на несколько частей, которые загружаются в коллекцию по отдельности. Разбивка текста (или чанкинг) может, например, происходить в случае больших тикетов поддержки или статей знаний. Разбив текст на части, можно добиться того, что поиск всегда будет приносить полезные результаты.
Возьмем размер фрагмента в 1000 лексем и текстовые поля body, title, description и subject, поскольку именно они будут присутствовать в данных, которые мы получим из Zendesk.
- Встраивание - с помощью моделей машинного обучения текстовые фрагменты, полученные в результате обработки, преобразуются в векторные вкрапления, которые затем можно искать на предмет семантического сходства. Для создания вкраплений необходимо предоставить ключ OpenAI API. Airbyte отправит каждый чанк в OpenAI и добавит полученный вектор к сущностям, загруженным в ваш кластер Milvus.
- Индексирование - После того как вы векторизировали чанки, вы можете загрузить их в базу данных. Для этого вставьте информацию, которую вы получили при настройке кластера и коллекции в кластере Milvus. Нажав кнопку "Проверить и сохранить", вы проверите, все ли правильно выстроено (правильные учетные данные, коллекция существует и имеет ту же векторную размерность, что и настроенное вложение, и т. д.).
Настройка потока синхронизации
Последний шаг перед тем, как данные будут готовы к потоку, - выбор "потоков" для синхронизации. Поток - это коллекция записей в источнике. Поскольку Zendesk поддерживает большое количество потоков, которые не имеют отношения к нашему случаю использования, давайте выберем только "билеты" и "статьи" и отключим все остальные, чтобы сэкономить полосу пропускания и убедиться, что в поиске будет отображаться только релевантная информация:

Как только соединение будет установлено, Airbyte начнет синхронизацию данных. Их появление в коллекции Milvus может занять несколько минут.
Если вы выберете частоту репликации, Airbyte будет запускаться регулярно, чтобы поддерживать коллекцию Milvus в актуальном состоянии с учетом изменений в статьях Zendesk и вновь созданных проблемах.
Поток проверок
Вы можете проверить в пользовательском интерфейсе кластера Milvus, как структурированы данные в коллекции, перейдя на игровую площадку и выполнив запрос "Query Data" с фильтром, установленным на "_ab_stream == \"tickets\"".

Создание приложения Streamlit для запроса коллекции
Наши данные готовы - теперь нам нужно создать приложение для их использования. В данном случае приложение будет представлять собой простую форму поддержки, в которую пользователи могут отправлять заявки. Когда пользователь нажмет кнопку "Отправить", мы сделаем две вещи:
- поиск похожих заявок, поданных пользователями той же организации
- Поиск статей, основанных на знаниях, которые могут быть релевантны пользователю.
В обоих случаях мы будем использовать семантический поиск с помощью вкраплений OpenAI. Для этого описание проблемы, введенное пользователем, также встраивается и используется для извлечения похожих сущностей из кластера Milvus. Если есть соответствующие результаты, они отображаются под формой.
Настройка среды пользовательского интерфейса
Вам понадобится локальная установка Python, так как мы будем использовать Streamlit для реализации приложения.
Сначала установите Streamlit, клиентскую библиотеку Milvus и клиентскую библиотеку OpenAI локально:
pip install streamlit pymilvus openai
Чтобы отобразить базовую форму поддержки, создайте python-файл basic_support_form.py
:
import streamlit as st
with st.form("my_form"):
st.write("Submit a support case")
text_val = st.text_area("Describe your problem")
submitted = st.form_submit_button("Submit")
if submitted:
# TODO check for related support cases and articles
st.write("Submitted!")
Чтобы запустить приложение, используйте Streamlit run:
streamlit run basic_support_form.py
Это приведет к отрисовке базовой формы:

Настройка службы запросов бэкенда
Далее проверим существующие открытые тикеты, которые могут быть релевантны. Для этого мы внедрили текст, введенный пользователем с помощью OpenAI, затем выполнили поиск по сходству в нашей коллекции, отфильтровав все еще открытые билеты. Если найдется такой, в котором расстояние между введенным тикетом и существующим очень мало, сообщите об этом пользователю и не отправляйте:
import streamlit as st
import os
import pymilvus
import openai
with st.form("my_form"):
st.write("Submit a support case")
text_val = st.text_area("Describe your problem?")
submitted = st.form_submit_button("Submit")
if submitted:
import os
import pymilvus
import openai
org_id = 360033549136 # TODO Load from customer login data
pymilvus.connections.connect(uri=os.environ["MILVUS_URL"], token=os.environ["MILVUS_TOKEN"])
collection = pymilvus.Collection("zendesk")
embedding = openai.Embedding.create(input=text_val, model="text-embedding-ada-002")['data'][0]['embedding']
results = collection.search(data=[embedding], anns_field="vector", param={}, limit=2, output_fields=["_id", "subject", "description"], expr=f'status == "new" and organization_id == {org_id}')
st.write(results[0])
if len(results[0]) > 0 and results[0].distances[0] < 0.35:
matching_ticket = results[0][0].entity
st.write(f"This case seems very similar to {matching_ticket.get('subject')} (id #{matching_ticket.get('_id')}). Make sure it has not been submitted before")
else:
st.write("Submitted!")
Здесь происходит несколько вещей:
- Устанавливается соединение с кластером Milvus.
- Используется сервис OpenAI для создания вставки описания, введенного пользователем.
- Выполняется поиск сходства, фильтруя результаты по статусу тикета и идентификатору организации (поскольку релевантными являются только открытые тикеты одной организации).
- Если есть результаты и расстояние между векторами вкраплений существующего тикета и вновь введенного текста ниже определенного порога, об этом сообщается.
Чтобы запустить новое приложение, необходимо сначала установить переменные окружения для OpenAI и Milvus:
export MILVUS_TOKEN=...
export MILVUS_URL=https://...
export OPENAI_API_KEY=sk-...
streamlit run app.py
При попытке отправить билет, который уже существует, результат будет выглядеть так:

Показывать больше релевантной информации
Как видно из зеленого отладочного вывода, скрытого в финальной версии, два тикета соответствовали нашему поиску (в статусе new, от текущей организации и близко к вектору встраивания). Однако первый из них (релевантный) ранжировался выше, чем второй (нерелевантный в данной ситуации), что отражается в меньшем значении расстояния. Эта связь фиксируется в векторах встраивания без прямого сопоставления слов, как при обычном полнотекстовом поиске.
В заключение давайте покажем полезную информацию после отправки билета, чтобы предоставить пользователю как можно больше релевантной информации.
Для этого мы выполним второй поиск после отправки тикета, чтобы получить наиболее подходящие статьи базы знаний:
......
else:
# TODO Actually send out the ticket
st.write("Submitted!")
article_results = collection.search(data=[embedding], anns_field="vector", param={}, limit=5, output_fields=["title", "html_url"], expr=f'_ab_stream == "articles"')
st.write(article_results[0])
if len(article_results[0]) > 0:
st.write("We also found some articles that might help you:")
for hit in article_results[0]:
if hit.distance < 0.362:
st.write(f"* [{hit.entity.get('title')}]({hit.entity.get('html_url')})")
Если нет открытого тикета поддержки с высоким показателем сходства, новый тикет отправляется, а соответствующие статьи базы знаний отображаются ниже:

Заключение
Хотя представленный здесь пользовательский интерфейс - это не настоящая форма поддержки, а пример, иллюстрирующий вариант использования, сочетание Airbyte и Milvus очень мощное - оно позволяет легко загружать текст из самых разных источников (от баз данных типа Postgres, API типа Zendesk или GitHub до полностью пользовательских источников, созданных с помощью SDK или визуального конструктора коннекторов Airbyte) и индексировать его во встроенном виде в Milvus, мощной векторной поисковой системе, способной масштабироваться до огромных объемов данных.
Airbyte и Milvus имеют открытый исходный код и совершенно бесплатны для использования в вашей инфраструктуре, при желании можно воспользоваться облачными предложениями для разгрузки операций.
Помимо классического варианта использования семантического поиска, описанного в этой статье, общая схема может быть использована для создания чат-бота, отвечающего на вопросы по методу RAG (Retrieval Augmented Generation), рекомендательных систем или для повышения релевантности и эффективности рекламы.