エアバイトオープンソースのデータ移動基盤
Airbyteは、抽出とロード(EL)データパイプラインを構築するためのオープンソースのデータ移動インフラストラクチャです。汎用性、拡張性、使いやすさを重視して設計されています。Airbyteのコネクタカタログには、350以上のコネクタがあらかじめ組み込まれています。これらのコネクタを使用すると、わずか数分でソースからデスティネーションへのデータ複製を開始できます。
Airbyteの主要コンポーネント
1.コネクタカタログ
- 350以上の構築済みコネクタ:Airbyteのコネクタカタログには、350以上のコネクタがあらかじめ用意されています。これらのコネクタを使用すると、わずか数分でソースからデスティネーションへのデータ複製を開始できます。
- ノーコードコネクタビルダー:No-Code Connector Builderのようなツールを使って、Airbyteの機能を簡単に拡張し、カスタムのユースケースをサポートすることができます。
2.プラットフォーム
Airbyteのプラットフォームは、クラウドマネージドまたはセルフマネージドとして利用可能な、データ移動操作の構成とスケーリングに必要なすべての水平サービスを提供します。
3.ユーザーインターフェース
Airbyteは、UI、PyAirbyte(Pythonライブラリ)、API、Terraform Providerを備えており、好みのツールやインフラ管理のアプローチと統合することができます。
Airbyteの機能により、データソースをMilvusクラスタに統合し、類似検索を行うことができます。
始める前に
必要なもの
- Zendeskアカウント(またはデータを同期したい他のデータソース)
- Airbyteアカウントまたはローカルインスタンス
- OpenAI API キー
- Milvus クラスタ
- ローカルにインストールされた Python 3.10
Milvusクラスタのセットアップ
本番用のK8sクラスタをすでにデプロイしている場合は、このステップをスキップして直接Milvus Operatorのデプロイに進むことができます。そうでない場合は、Milvus Operatorを使用してMilvusクラスタをデプロイする手順に従ってください。
個々のエンティティ(ここではサポートチケットとナレッジベース記事)は "コレクション "に保存されます - クラスタがセットアップされたら、コレクションを作成する必要があります。適切な名前を選択し、OpenAIのエンベッディングサービスによって生成されるベクトルの次元数に合わせてDimensionを1536に設定します。
作成後、エンドポイントと認証情報を記録する。
Airbyteで接続を設定する
データベースの準備ができたので、データを移動してみましょう!そのためには、Airbyteで接続を設定する必要があります。cloud.airbyte.comでAirbyteのクラウドアカウントにサインアップするか、ドキュメントに記載されているようにローカルインスタンスを起動します。
ソースのセットアップ
新規接続」をクリックし、ソースとして「Zendesk Support」コネクタを選択します。Test and Save "ボタンをクリックすると、Airbyteが接続の確立を確認します。
Airbyteクラウド上では、[認証]ボタンをクリックすることで簡単に認証できます。ローカルのAirbyteインスタンスを使用する場合は、ドキュメントページに記載されている手順に従ってください。
接続先の設定
すべて正常に動作している場合、次はデータの移動先を設定します。ここでは「Milvus」コネクタを選択します。
Milvus コネクタは以下の 3 つのことを行う:
- チャンキングとフォーマット- Zendesk レコードをテキストとメタデータに分割します。テキストが指定されたチャンクサイズより大きい場合、レコードは複数の部分に分割され、個別にコレクションに読み込まれます。テキストの分割(またはチャンキング)は、たとえば大規模なサポートチケットやナレッジ記事の場合に発生します。テキストを分割することで、検索が常に有益な結果をもたらすようにすることができます。
チャンクサイズを1000トークン、テキストフィールドをbody、title、description、subjectとします。
- 埋め込み- 機械学習モデルを使用して、処理部分によって生成されたテキストチャンクをベクトル埋め込みに変換します。埋め込みを作成するには、OpenAIのAPIキーを提供する必要があります。Airbyteは各チャンクをOpenAIに送信し、結果のベクトルをMilvusクラスタにロードされたエンティティに追加します。
- インデックス作成- チャンクをベクトル化したら、データベースにロードします。これを行うには、Milvusクラスタにクラスタとコレクションをセットアップするときに得た情報を挿入します。 Test and save "をクリックすると、すべてが正しく並んでいるかチェックされる(有効な認証情報、コレクションが存在し、設定されたエンベッディングと同じベクトル次元を持っているなど)。
ストリーム同期フローの設定
データがフローできるようになる前の最後のステップは、同期する "ストリーム "を選択することである。ストリームとは、ソース内のレコードの集まりです。Zendesk は、今回のユースケースには関係のない多数のストリームをサポートしているので、帯域幅を節約し、関連する情報だけが検索に表示されるようにするために、「チケット」と「記事」だけを選択し、他はすべて無効にしましょう:
ストリーム名をクリックすると、ソースから抽出するフィールドを選択できます。Incremental|Append+Deduped "同期モードは、ZendeskとMilvusが最小限のデータ(前回の実行以降に変更された記事とチケットのみ)を転送しながら同期を保つことを意味します。接続が設定されるとすぐに、Airbyteはデータの同期を開始します。Milvusコレクションに表示されるまで数分かかることがあります。
レプリケーションの頻度を選択した場合、Airbyteは定期的に実行され、Zendeskアーティクルの変更や新しく作成されたチケットをMilvusコレクションに反映します。
チェックフロー
MilvusクラスタUIで、プレイグラウンドに移動し、"_ab_stream == \"にフィルタを設定した "Query Data "クエリを実行することで、コレクション内のデータがどのような構造になっているかを確認することができます。
結果ビューを見るとわかるように、Zendeskから送られてきた各レコードは、指定されたすべてのメタデータとともにMilvusに個別のエンティティとして保存されています。埋め込んだテキストチャンクは、"text" プロパティとして表示されます。コレクションをクエリするStreamlitアプリのビルド
データの準備は整いました - 次はそれを使うアプリケーションをビルドする必要があります。この場合、アプリケーションはユーザーがサポートケースを提出するためのシンプルなサポートフォームになります。ユーザーが送信をクリックすると、次の2つのことを行います:
- 同じ組織のユーザによって提出された類似のチケットを検索する。
- ユーザーに関連しそうな知識ベースの記事を検索する。
どちらの場合も、OpenAIのエンベッディングを使ったセマンティック検索を活用します。これを行うために、ユーザが入力した問題の説明も埋め込まれ、Milvusクラスタから類似のエンティティを検索するために使用されます。関連する結果があれば、フォームの下に表示されます。
UI環境のセットアップ
アプリケーションの実装にStreamlitを使用するため、ローカルにPythonをインストールする必要があります。
まず、Streamlit、Milvusクライアントライブラリ、OpenAIクライアントライブラリをローカルにインストールします:
pip install streamlit pymilvus openai
基本的なサポートフォームをレンダリングするために、Pythonファイルbasic_support_form.py
を作成します:
import streamlit as st
with st.form("my_form"):
st.write("Submit a support case")
text_val = st.text_area("Describe your problem")
submitted = st.form_submit_button("Submit")
if submitted:
# TODO check for related support cases and articles
st.write("Submitted!")
アプリケーションを実行するには、Streamlit runを使います:
streamlit run basic_support_form.py
これで基本フォームがレンダリングされます:
この例のコードはGitHubにもあります。バックエンドクエリーサービスのセットアップ
次に、関連しそうな既存のオープンチケットをチェックしてみましょう。これを行うために、OpenAI を使ってユーザが入力したテキストを埋め込み、コレクションで類似検索を行い、まだ開いているチケットをフィルタリングします。もし、入力されたチケットと既存のチケットの間の距離が非常に低いものがあれば、ユーザに知らせ、送信しないようにします:
import streamlit as st
import os
import pymilvus
import openai
with st.form("my_form"):
st.write("Submit a support case")
text_val = st.text_area("Describe your problem?")
submitted = st.form_submit_button("Submit")
if submitted:
import os
import pymilvus
import openai
org_id = 360033549136 # TODO Load from customer login data
pymilvus.connections.connect(uri=os.environ["MILVUS_URL"], token=os.environ["MILVUS_TOKEN"])
collection = pymilvus.Collection("zendesk")
embedding = openai.Embedding.create(input=text_val, model="text-embedding-ada-002")['data'][0]['embedding']
results = collection.search(data=[embedding], anns_field="vector", param={}, limit=2, output_fields=["_id", "subject", "description"], expr=f'status == "new" and organization_id == {org_id}')
st.write(results[0])
if len(results[0]) > 0 and results[0].distances[0] < 0.35:
matching_ticket = results[0][0].entity
st.write(f"This case seems very similar to {matching_ticket.get('subject')} (id #{matching_ticket.get('_id')}). Make sure it has not been submitted before")
else:
st.write("Submitted!")
ここでいくつかのことが起こっています:
- Milvus クラスタへの接続が設定されます。
- OpenAIサービスは、ユーザが入力した説明の埋め込みを生成するために使用されます。
- 類似検索が実行され、チケットのステータスと組織 ID によって結果がフィルタリングされます (同じ組織のオープンチケットのみが関連するため)。
- 結果があり、既存のチケットと新しく入力されたテキストの埋め込みベクトル間の距離がある閾値以下であれば、この事実を呼び出します。
新しいアプリを実行するには、まず OpenAI と Milvus の環境変数を設定する必要があります:
export MILVUS_TOKEN=...
export MILVUS_URL=https://...
export OPENAI_API_KEY=sk-...
streamlit run app.py
すでに存在するチケットを送信しようとすると、このようになります:
この例のコードはGitHub にもあります。より多くの関連情報を表示する
最終バージョンに隠された緑色のデバッグ出力でわかるように、2つのチケットが私たちの検索にマッチしました(ステータスが新規で、現在の組織からで、埋め込みベクトルに近い)。しかし、最初のチケット(関連)は2番目のチケット(この状況では無関係)よりも上位にランクされ、それは低い距離値に反映されています。この関係は、通常の全文検索のように、単語を直接マッチングさせることなく、埋め込みベクトルに取り込まれる。
最後に、チケットの提出後に役立つ情報を表示して、ユーザにできるだけ多くの関連情報を前もって与えましょう。
そのために、チケットが送信された後に2回目の検索を行い、上位にマッチするナレッジベースの記事を取得します:
......
else:
# TODO Actually send out the ticket
st.write("Submitted!")
article_results = collection.search(data=[embedding], anns_field="vector", param={}, limit=5, output_fields=["title", "html_url"], expr=f'_ab_stream == "articles"')
st.write(article_results[0])
if len(article_results[0]) > 0:
st.write("We also found some articles that might help you:")
for hit in article_results[0]:
if hit.distance < 0.362:
st.write(f"* [{hit.entity.get('title')}]({hit.entity.get('html_url')})")
高い類似度スコアを持つオープンサポートチケットがない場合、新しいチケットが送信され、関連するナレッジ記事が下に表示されます:
この例のコードはGithub にもあります。結論
ここに示したUIは実際のサポートフォームではなく、ユースケースを説明するための一例ですが、AirbyteとMilvusの組み合わせは非常に強力なものです - さまざまなソース(ZendeskやGitHubのようなAPI、Postgresのようなデータベースから、AirbyteのSDKやビジュアルコネクタビルダーを使用して構築された完全なカスタムソースまで)からテキストを読み込んで、Milvusに埋め込まれた形でインデックスを作成することが簡単にできます。
AirbyteとMilvusはオープンソースであり、お客様のインフラストラクチャ上で完全に無料で使用することができます。
この記事で説明されている古典的なセマンティック検索の使用例以外にも、一般的なセットアップは、RAGメソッド(Retrieval Augmented Generation)を使用した質問応答チャットボットや、レコメンダーシステムを構築したり、より適切で効率的な広告を作成したりするのにも使用できる。