Airbyte:開放原始碼資料移動基礎架構
Airbyte 是一套開放原始碼的資料移動基礎架構,用於建立抽取與載入 (EL) 資料管道。它的設計具有多功能性、可擴展性和易用性。Airbyte 的連接器目錄「開箱即用」,包含 350 多個預先建立的連接器。這些連接器可用於在短短幾分鐘內將資料從來源複製到目的地。
Airbyte 的主要組成部分
1.連接器目錄
- 350+ 預建連接器:Airbyte 的連接器目錄 「開箱即用」,包含 350 多個預建連接器。這些連接器可用於在短短幾分鐘內將資料從來源複製到目的地。
- No-Code Connector Builder:您可以透過No-Code Connector Builder 等工具輕鬆擴展 Airbyte 的功能,以支援您的自訂用例。
2.平台
Airbyte 的平台提供配置和擴展資料移動作業所需的所有水平服務,可選擇雲端管理或自我管理。
3.使用者介面
Airbyte 具備使用者介面、PyAirbyte(Python 函式庫)、API 和Terraform Provider,可與您偏好的工具和基礎架構管理方法整合。
透過 Airbyte 的能力,使用者可以將資料來源整合至 Milvus 叢集,進行相似性搜尋。
開始之前
您將需要
- Zendesk 帳戶 (或其他您想要同步資料的資料來源)
- Airbyte 帳戶或本機實例
- OpenAI API 金鑰
- Milvus 集群
- 本機已安裝 Python 3.10
設定 Milvus 叢集
如果您已經為生產部署了 K8s 集群,您可以跳過此步驟,直接部署 Milvus Operator。如果沒有,您可以按照步驟使用 Milvus Operator 部署 Milvus 集群。
個別實體 (在我們的例子中,支援票單和知識庫文章) 儲存在「集合」中 - 在您的群集設定完成後,您需要建立一個集合。選擇一個合適的名稱,並將 Dimension 設定為 1536,以符合 OpenAI embeddings 服務所產生的向量維度。
建立後,記錄端點和驗證資訊。
在 Airbyte 中設定連線
我們的資料庫已準備就緒,讓我們移動一些資料過去!为此,我们需要在 Airbyte 中配置连接。您可以在cloud.airbyte.com註冊一個 Airbyte 雲端帳戶,或按照文件中所述啟動一個本地實例。
設定來源
实例运行后,我们需要设置连接 - 单击 "New connection「(新建连接)并选择 」Zendesk Support "连接器作为源。单击 "Test and Save"(测试并保存)按钮后,Airbyte 将检查是否可以建立连接。
在 Airbyte 云上,您可以通过单击 「验证 」按钮轻松进行验证。使用本機 Airbyte 實例時,請遵循文件頁面上概述的指示。
設定目的地
如果一切運作正常,下一步就是設定要移動資料的目的地。在此,選擇「Milvus」連接器。
Milvus 連接器可做三件事:
- 分塊和格式化- 將 Zendesk 記錄分割為文字和元資料。如果文字大於指定的分塊大小,記錄會被分割成多個部分,並單獨載入資料集中。例如,分割文字(或分塊)可能發生在大型支援票單或知識文章的情況。透過分割文字,您可以確保搜尋總是能得到有用的結果。
讓我們使用 1000 個標記的分塊大小,以及 body、title、description 和 subject 等文字欄位,因為這些都會出現在我們從 Zendesk 收到的資料中。
- 嵌入- 使用機器學習模型將處理部分產生的文字塊轉換為向量嵌入,然後您可以搜尋其語意相似性。若要建立嵌入,您必須提供 OpenAI API 金鑰。Airbyte 將傳送每個chunk 到 OpenAI,並將產生的向量加入載入您 Milvus 叢集的實體中。
- 索引- 一旦您將小塊向量化,您就可以將它們載入資料庫。為此,請插入您在 Milvus 叢集中設定叢集和集合時所獲得的資訊。 點選「測試並儲存」將檢查一切是否正確排列(有效憑證、集合存在且與設定的嵌入具有相同的向量維度等)。
設定串流同步流程
資料準備好流動之前的最後一個步驟是選擇要同步的「串流」。流是來源中記錄的集合。由於 Zendesk 支援大量與我們的使用案例不相關的串流,讓我們只選擇「票單」和「文章」,禁用所有其他串流,以節省頻寬,並確保只有相關資訊會顯示在搜尋中:

連接一經建立,Airbyte 即會開始同步資料。它可能需要幾分鐘才能出現在您的 Milvus 收集中。
如果您選擇複製頻率,Airbyte 會定期執行,以保持您的 Milvus 收集與 Zendesk 文章和新建立問題的變更同步。
檢查流程
您可以在 Milvus 集群 UI 中检查数据在集合中的结构,方法是导航到 playground 并执行 "Query Data 「查询,过滤器设置为」_ab_stream == \"ticket/""。

建立 Streamlit 應用程式查詢資料集
我們的資料已經準備就緒 - 現在我們需要建立應用程式來使用它。在這種情況下,應用程式將會是一個簡單的支援表單,供使用者提交支援個案。當使用者按下提交時,我們會做兩件事:
- 搜尋同一組織的使用者所提交的類似票單
- 搜尋可能與使用者相關的知識型文章
在這兩種情況下,我們都會使用 OpenAI 內嵌利用語意搜尋。為此,使用者輸入的問題描述也會被嵌入,並用於從 Milvus 叢集中擷取相似的實體。如果有相關的結果,就會顯示在表單下方。
設定 UI 環境
您需要本機安裝 Python,因為我們會使用 Streamlit 來實作應用程式。
首先,在本機安裝 Streamlit、Milvus 客戶端函式庫和 OpenAI 客戶端函式庫:
pip install streamlit pymilvus openai
若要渲染基本的支援表單,請建立一個 python 檔案basic_support_form.py
:
import streamlit as st
with st.form("my_form"):
st.write("Submit a support case")
text_val = st.text_area("Describe your problem")
submitted = st.form_submit_button("Submit")
if submitted:
# TODO check for related support cases and articles
st.write("Submitted!")
要執行應用程式,請使用 Streamlit run:
streamlit run basic_support_form.py
這將會渲染一個基本的表單:

設定後端查詢服務
接下來,讓我們檢查可能相關的現有開狀。為了做到這一點,我們使用 OpenAI 嵌入使用者輸入的文字,然後在我們的集合中進行相似性搜尋,過濾仍未結案的票單。如果有一個提供的票單與現有票單之間的距離很低,就會讓使用者知道,並且不要提交:
import streamlit as st
import os
import pymilvus
import openai
with st.form("my_form"):
st.write("Submit a support case")
text_val = st.text_area("Describe your problem?")
submitted = st.form_submit_button("Submit")
if submitted:
import os
import pymilvus
import openai
org_id = 360033549136 # TODO Load from customer login data
pymilvus.connections.connect(uri=os.environ["MILVUS_URL"], token=os.environ["MILVUS_TOKEN"])
collection = pymilvus.Collection("zendesk")
embedding = openai.Embedding.create(input=text_val, model="text-embedding-ada-002")['data'][0]['embedding']
results = collection.search(data=[embedding], anns_field="vector", param={}, limit=2, output_fields=["_id", "subject", "description"], expr=f'status == "new" and organization_id == {org_id}')
st.write(results[0])
if len(results[0]) > 0 and results[0].distances[0] < 0.35:
matching_ticket = results[0][0].entity
st.write(f"This case seems very similar to {matching_ticket.get('subject')} (id #{matching_ticket.get('_id')}). Make sure it has not been submitted before")
else:
st.write("Submitted!")
這裡發生了幾件事情:
- 與 Milvus 叢集的連線已建立。
- 使用 OpenAI 服務產生使用者輸入描述的嵌入。
- 執行相似性搜尋,依據票單狀態和組織 ID 篩選結果(因為只有同一組織的開放票單才相關)。
- 如果有結果,且現有票單的嵌入向量與新輸入文字的嵌入向量之間的距離低於某個臨界值,則會指出這個事實。
要執行新的應用程式,需要先為 OpenAI 和 Milvus 設定環境變數:
export MILVUS_TOKEN=...
export MILVUS_URL=https://...
export OPENAI_API_KEY=sk-...
streamlit run app.py
當嘗試提交已經存在的票單,結果會是這樣:

顯示更多相關資訊
正如您在隱藏於最終版本中的綠色除錯輸出中所看到的,有兩張票單符合我們的搜尋(狀態為新、來自目前的組織,且接近嵌入向量)。但是,第一張 (相關) 的排名高於第二張 (在此情況下不相關),這反映在較低的距離值上。這種關係在嵌入向量中被捕捉到,而不會像一般全文檢索一樣直接匹配字詞。
總結一下,讓我們在提交票單之後顯示有用的資訊,儘可能在最前面提供使用者相關資訊。
為此,我們會在提交票單後進行第二次搜尋,以取得最匹配的知識庫文章:
......
else:
# TODO Actually send out the ticket
st.write("Submitted!")
article_results = collection.search(data=[embedding], anns_field="vector", param={}, limit=5, output_fields=["title", "html_url"], expr=f'_ab_stream == "articles"')
st.write(article_results[0])
if len(article_results[0]) > 0:
st.write("We also found some articles that might help you:")
for hit in article_results[0]:
if hit.distance < 0.362:
st.write(f"* [{hit.entity.get('title')}]({hit.entity.get('html_url')})")
如果沒有相似度高的開放支援票單,則會提交新票單,並在下方顯示相關的知識文章:

總結
雖然這裡所顯示的 UI 並非實際的支援表單,而是用來說明使用個案的範例,但 Airbyte 與 Milvus 的結合是非常強大的 - 它可以輕鬆地從各種來源載入文字(從像 Postgres 之類的資料庫,到像 Zendesk 或 GitHub 之類的 API,再到使用 Airbyte 的 SDK 或視覺連接器建立器建立的完全自訂來源),並在 Milvus 中以內嵌的形式進行索引,Milvus 是一個強大的向量搜尋引擎,能夠擴展至龐大的資料量。
Airbyte 和 Milvus 是開放原始碼,可完全免費在您的基礎架構上使用,如果需要,還可透過雲端服務來卸載作業。
除了本文所說明的經典語意搜尋用例外,一般的設定也可用於使用 RAG 方法 (Retrieval Augmented Generation) 建立問題解答聊天機器人、推薦系統,或協助提高廣告的相關性與效率。