milvus-logo
LFAI
Casa
  • Integrazioni
    • Fonti dei dati

Airbyte: Infrastruttura open source per la movimentazione dei dati

Airbyte è un'infrastruttura open-source per la movimentazione dei dati per la creazione di pipeline di estrazione e caricamento (EL). È progettata per garantire versatilità, scalabilità e facilità d'uso. Il catalogo di connettori di Airbyte viene fornito "out-of-the-box" con oltre 350 connettori precostituiti. Questi connettori possono essere utilizzati per iniziare a replicare i dati da un'origine a una destinazione in pochi minuti.

Componenti principali di Airbyte

1. Catalogo dei connettori

  • Oltre 350 connettori precostituiti: Il catalogo dei connettori di Airbyte viene fornito "out-of-the-box" con oltre 350 connettori precostituiti. Questi connettori possono essere utilizzati per iniziare a replicare i dati da un'origine a una destinazione in pochi minuti.
  • Creazione di connettori senza codice: È possibile estendere facilmente le funzionalità di Airbyte per supportare i propri casi d'uso personalizzati attraverso strumenti come il No-Code Connector Builder.

2. La piattaforma

La piattaforma di Airbyte fornisce tutti i servizi orizzontali necessari per configurare e scalare le operazioni di movimento dei dati, disponibili come gestiti dal cloud o autogestiti.

3. L'interfaccia utente

Airbyte dispone di un'interfaccia utente, di PyAirbyte (libreria Python), di API e di Terraform Provider per integrarsi con gli strumenti e gli approcci preferiti alla gestione dell'infrastruttura.

Grazie ad Airbyte, gli utenti possono integrare le fonti di dati nel cluster Milvus per la ricerca di similarità.

Prima di iniziare

Sono necessari

  • account Zendesk (o un'altra fonte di dati da cui si desidera sincronizzare i dati)
  • Account Airbyte o istanza locale
  • Chiave API OpenAI
  • Cluster Milvus
  • Python 3.10 installato localmente

Configurare il cluster Milvus

Se avete già installato un cluster K8s per la produzione, potete saltare questo passaggio e procedere direttamente all'installazione di Milvus Operator. In caso contrario, potete seguire i passi per distribuire un cluster Milvus con Milvus Operator.

Le singole entità (nel nostro caso, i ticket di assistenza e gli articoli della knowledge base) sono memorizzate in una "collezione": dopo aver configurato il cluster, è necessario creare una collezione. Scegliere un nome adatto e impostare la dimensione a 1536 per adattarla alla dimensionalità del vettore generato dal servizio OpenAI embeddings.

Dopo la creazione, registrare le informazioni sull'endpoint e sull'autenticazione.

Impostare la connessione in Airbyte

Il nostro database è pronto, spostiamo i dati! Per farlo, dobbiamo configurare una connessione in Airbyte. È possibile registrarsi per un account Airbyte cloud su cloud.airbyte.com o avviare un'istanza locale come descritto nella documentazione.

Impostare la sorgente

Una volta che la vostra istanza è in funzione, dobbiamo impostare la connessione: fate clic su "Nuova connessione" e scegliete il connettore "Zendesk Support" come sorgente. Dopo aver fatto clic sul pulsante "Prova e salva", Airbyte verificherà se la connessione può essere stabilita.

Sul cloud di Airbyte, è possibile autenticarsi facilmente facendo clic sul pulsante Autentica. Se si utilizza un'istanza Airbyte locale, seguire le indicazioni riportate nella pagina della documentazione.

Impostazione della destinazione

Se tutto funziona correttamente, il passo successivo è impostare la destinazione in cui spostare i dati. Qui si sceglie il connettore "Milvus".

Il connettore Milvus svolge tre funzioni:

  • Suddivisione e formattazione - Suddivide i record Zendesk in testo e metadati. Se il testo è più grande della dimensione specificata, i record vengono suddivisi in più parti che vengono caricate singolarmente nella raccolta. La suddivisione del testo (o chunking) può avvenire, ad esempio, nel caso di ticket di assistenza o articoli di conoscenza di grandi dimensioni. Suddividendo il testo, si può garantire che le ricerche diano sempre risultati utili.

Scegliamo una dimensione di 1000 token e i campi di testo corpo, titolo, descrizione e oggetto, poiché questi saranno presenti nei dati che riceveremo da Zendesk.

  • Incorporamento - L'uso di modelli di apprendimento automatico trasforma i pezzi di testo prodotti dalla parte di elaborazione in incorporazioni vettoriali che possono essere ricercate per somiglianza semantica. Per creare gli embeddings, è necessario fornire la chiave API OpenAI. Airbyte invierà ogni pezzo a OpenAI e aggiungerà il vettore risultante alle entità caricate nel cluster Milvus.
  • Indicizzazione - Una volta vettorializzati i pezzi, è possibile caricarli nel database. Per farlo, inserire le informazioni ottenute durante la configurazione del cluster e della collezione in Milvus cluster.
    Facendo clic su "Prova e salva" si verificherà se tutto è allineato correttamente (credenziali valide, la raccolta esiste e ha la stessa dimensionalità vettoriale dell'incorporazione configurata, ecc.)

Impostazione del flusso di sincronizzazione dei flussi

L'ultimo passo prima che i dati siano pronti a fluire è la selezione dei "flussi" da sincronizzare. Un flusso è una raccolta di record nell'origine. Poiché Zendesk supporta un gran numero di flussi che non sono rilevanti per il nostro caso d'uso, selezioniamo solo "ticket" e "articoli" e disabilitiamo tutti gli altri per risparmiare larghezza di banda e assicurarci che solo le informazioni rilevanti vengano visualizzate nelle ricerche:

È possibile selezionare i campi da estrarre dall'origine facendo clic sul nome del flusso. La modalità di sincronizzazione "Incrementale | Append + Deduped" significa che le successive esecuzioni della connessione mantengono Zendesk e Milvus sincronizzati, trasferendo però un numero minimo di dati (solo gli articoli e i ticket modificati dall'ultima esecuzione).

Non appena la connessione è stata impostata, Airbyte inizierà a sincronizzare i dati. Possono essere necessari alcuni minuti per apparire nella raccolta di Milvus.

Se si seleziona una frequenza di replica, Airbyte verrà eseguito regolarmente per mantenere la raccolta Milvus aggiornata con le modifiche agli articoli Zendesk e ai problemi appena creati.

Controllo del flusso

È possibile verificare nell'interfaccia utente del cluster Milvus come sono strutturati i dati nella raccolta navigando nell'area di gioco ed eseguendo una query "Query Data" con un filtro impostato su "_ab_stream == \"tickets\"".

Come si può vedere nella vista del risultato, ogni record proveniente da Zendesk è memorizzato come entità separata in Milvus con tutti i metadati specificati. Il pezzo di testo su cui si basa l'incorporamento è mostrato come proprietà "text" - questo è il testo che è stato incorporato usando OpenAI e sarà quello su cui effettueremo la ricerca.

Creare l'applicazione Streamlit per interrogare la collezione

I nostri dati sono pronti: ora dobbiamo creare l'applicazione per utilizzarli. In questo caso, l'applicazione sarà un semplice modulo di supporto per l'invio di casi di assistenza da parte degli utenti. Quando l'utente preme invio, faremo due cose:

  • Ricerca di ticket simili inviati da utenti della stessa organizzazione.
  • Ricerca di articoli basati sulla conoscenza che potrebbero essere rilevanti per l'utente.

In entrambi i casi, sfrutteremo la ricerca semantica utilizzando gli embeddings di OpenAI. A tal fine, anche la descrizione del problema inserito dall'utente viene incorporata e utilizzata per recuperare entità simili dal cluster Milvus. Se ci sono risultati rilevanti, vengono mostrati sotto il modulo.

Configurazione dell'ambiente UI

È necessaria un'installazione locale di Python, poiché utilizzeremo Streamlit per implementare l'applicazione.

Per prima cosa, installare Streamlit, la libreria client Milvus e la libreria client OpenAI a livello locale:

pip install streamlit pymilvus openai

Per rendere un modulo di supporto di base, creare un file python basic_support_form.py:

import streamlit as st

with st.form("my_form"):
    st.write("Submit a support case")
    text_val = st.text_area("Describe your problem")

    submitted = st.form_submit_button("Submit")
    if submitted:
        # TODO check for related support cases and articles
        st.write("Submitted!")

Per eseguire l'applicazione, utilizzare Streamlit run:

streamlit run basic_support_form.py

Questo renderà un modulo di base:

Il codice di questo esempio è disponibile anche su GitHub.

Impostare il servizio di query del backend

Quindi, verifichiamo la presenza di ticket aperti esistenti che potrebbero essere rilevanti. Per farlo, abbiamo incorporato il testo inserito dall'utente con OpenAI, quindi abbiamo effettuato una ricerca per similarità sulla nostra collezione, filtrando i ticket ancora aperti. Se ce n'è uno con una distanza molto bassa tra il ticket fornito e quello esistente, lo comunichiamo all'utente e non lo inviamo:

import streamlit as st
import os
import pymilvus
import openai


with st.form("my_form"):
    st.write("Submit a support case")
    text_val = st.text_area("Describe your problem?")

    submitted = st.form_submit_button("Submit")
    if submitted:
        import os
        import pymilvus
        import openai

        org_id = 360033549136 # TODO Load from customer login data

        pymilvus.connections.connect(uri=os.environ["MILVUS_URL"], token=os.environ["MILVUS_TOKEN"])
        collection = pymilvus.Collection("zendesk")

        embedding = openai.Embedding.create(input=text_val, model="text-embedding-ada-002")['data'][0]['embedding']

        results = collection.search(data=[embedding], anns_field="vector", param={}, limit=2, output_fields=["_id", "subject", "description"], expr=f'status == "new" and organization_id == {org_id}')

        st.write(results[0])
        if len(results[0]) > 0 and results[0].distances[0] < 0.35:
            matching_ticket = results[0][0].entity
            st.write(f"This case seems very similar to {matching_ticket.get('subject')} (id #{matching_ticket.get('_id')}). Make sure it has not been submitted before")
        else:
            st.write("Submitted!")
            

Qui accadono diverse cose:

  • Viene impostata la connessione al cluster Milvus.
  • Il servizio OpenAI viene utilizzato per generare un embedding della descrizione inserita dall'utente.
  • Viene eseguita una ricerca di somiglianza, filtrando i risultati in base allo stato del ticket e all'id dell'organizzazione (poiché solo i ticket aperti della stessa organizzazione sono rilevanti).
  • Se ci sono risultati e la distanza tra i vettori di incorporamento del ticket esistente e del testo appena inserito è inferiore a una certa soglia, viene segnalato questo fatto.

Per eseguire la nuova applicazione, è necessario impostare prima le variabili di ambiente per OpenAI e Milvus:

export MILVUS_TOKEN=...
export MILVUS_URL=https://...
export OPENAI_API_KEY=sk-...

streamlit run app.py

Quando si tenta di inviare un ticket già esistente, il risultato sarà questo:

Il codice di questo esempio è disponibile anche su GitHub.

Mostrare più informazioni rilevanti

Come si può vedere nell'output di debug verde nascosto nella versione finale, due ticket corrispondevano alla nostra ricerca (in stato nuovo, dall'organizzazione corrente e vicino al vettore di incorporamento). Tuttavia, il primo (rilevante) si è classificato più in alto del secondo (irrilevante in questa situazione), il che si riflette nel valore di distanza più basso. Questa relazione viene catturata nei vettori di incorporamento senza che le parole vengano direttamente abbinate, come in una normale ricerca full-text.

Per concludere, mostriamo le informazioni utili dopo l'invio del ticket, in modo da fornire all'utente il maggior numero possibile di informazioni rilevanti in anticipo.

A tale scopo, faremo una seconda ricerca dopo l'invio del ticket per recuperare gli articoli della knowledge base che corrispondono maggiormente:

   ......
   
        else:
            # TODO Actually send out the ticket
            st.write("Submitted!")
            article_results = collection.search(data=[embedding], anns_field="vector", param={}, limit=5, output_fields=["title", "html_url"], expr=f'_ab_stream == "articles"')
            st.write(article_results[0])
            if len(article_results[0]) > 0:
                st.write("We also found some articles that might help you:")
                for hit in article_results[0]:
                    if hit.distance < 0.362:
                        st.write(f"* [{hit.entity.get('title')}]({hit.entity.get('html_url')})")

Se non c'è nessun ticket di assistenza aperto con un punteggio di somiglianza elevato, il nuovo ticket viene inviato e gli articoli di conoscenza pertinenti vengono mostrati di seguito:

Il codice di questo esempio è disponibile anche su Github.

Conclusione

Anche se l'interfaccia utente mostrata qui non è un vero e proprio modulo di assistenza, ma un esempio per illustrare il caso d'uso, la combinazione di Airbyte e Milvus è molto potente: consente di caricare facilmente il testo da un'ampia varietà di fonti (da database come Postgres ad API come Zendesk o GitHub, fino a fonti completamente personalizzate costruite utilizzando l'SDK o il costruttore di connettori visivi di Airbyte) e di indicizzarlo in forma incorporata in Milvus, un potente motore di ricerca vettoriale in grado di scalare su enormi quantità di dati.

Airbyte e Milvus sono open source e completamente gratuiti da utilizzare sulla vostra infrastruttura, con offerte cloud per scaricare le operazioni, se lo desiderate.

Oltre al classico caso d'uso della ricerca semantica illustrato in questo articolo, l'impostazione generale può essere utilizzata anche per costruire un chat bot in grado di rispondere alle domande utilizzando il metodo RAG (Retrieval Augmented Generation), sistemi di raccomandazione o contribuire a rendere la pubblicità più pertinente ed efficiente.