Airbyte : Infrastructure de mouvement de données open-source
Airbyte est une infrastructure de mouvement de données open-source pour la construction de pipelines de données d'extraction et de chargement (EL). Elle est conçue pour être polyvalente, évolutive et facile à utiliser. Le catalogue de connecteurs d'Airbyte est livré "prêt à l'emploi" avec plus de 350 connecteurs préconstruits. Ces connecteurs peuvent être utilisés pour commencer à répliquer des données d'une source vers une destination en quelques minutes seulement.
Principaux composants d'Airbyte
1. Catalogue de connecteurs
- Plus de 350 connecteurs préconstruits: Le catalogue de connecteurs d'Airbyte est livré " prêt à l'emploi " avec plus de 350 connecteurs préconstruits. Ces connecteurs peuvent être utilisés pour commencer à répliquer des données d'une source vers une destination en seulement quelques minutes.
- Constructeur de connecteurs sans code: Vous pouvez facilement étendre les fonctionnalités d'Airbyte pour prendre en charge vos cas d'utilisation personnalisés grâce à des outils tels que le No-Code Connector Builder.
2. La plateforme
La plateforme d'Airbyte fournit tous les services horizontaux nécessaires pour configurer et mettre à l'échelle les opérations de déplacement de données, disponibles en mode géré dans le nuage ou en mode autogéré.
3. L'interface utilisateur
Airbyte dispose d'une interface utilisateur, de PyAirbyte (bibliothèque Python), d'une API et d'un fournisseur Terraform pour s'intégrer à votre outil préféré et à votre approche de la gestion de l'infrastructure.
Grâce à Airbyte, les utilisateurs peuvent intégrer des sources de données dans le cluster Milvus pour la recherche de similarités.
Avant de commencer
Vous aurez besoin de
- un compte Zendesk (ou une autre source de données à partir de laquelle vous souhaitez synchroniser les données)
- Compte Airbyte ou instance locale
- Clé API OpenAI
- Cluster Milvus
- Python 3.10 installé localement
Configuration du cluster Milvus
Si vous avez déjà déployé un cluster K8s pour la production, vous pouvez sauter cette étape et procéder directement au déploiement de Milvus Operator. Sinon, vous pouvez suivre les étapes de déploiement d'un cluster Milvus avec Milvus Operator.
Les entités individuelles (dans notre cas, les tickets d'assistance et les articles de la base de connaissances) sont stockées dans une "collection" - une fois que votre cluster est configuré, vous devez créer une collection. Choisissez un nom approprié et définissez la dimension à 1536 pour correspondre à la dimensionnalité vectorielle générée par le service d'intégration OpenAI.
Après la création, enregistrez le point de terminaison et les informations d'authentification.
Configurer la connexion dans Airbyte
Notre base de données est prête, déplaçons quelques données ! Pour ce faire, nous devons configurer une connexion dans Airbyte. Soit vous vous inscrivez pour un compte Airbyte cloud sur cloud.airbyte.com, soit vous démarrez une instance locale comme décrit dans la documentation.
Configurer la source
Une fois que votre instance fonctionne, nous devons configurer la connexion - cliquez sur "Nouvelle connexion" et choisissez le connecteur "Zendesk Support" comme source. Après avoir cliqué sur le bouton "Test and Save", Airbyte vérifiera si la connexion peut être établie.
Sur le nuage Airbyte, vous pouvez facilement vous authentifier en cliquant sur le bouton Authentifier. Si vous utilisez une instance locale d'Airbyte, suivez les instructions décrites sur la page de documentation.
Configurer la destination
Si tout fonctionne correctement, l'étape suivante consiste à configurer la destination vers laquelle déplacer les données. Pour ce faire, choisissez le connecteur "Milvus".
Le connecteur Milvus a trois fonctions :
- Fractionnement et formatage - Il divise les enregistrements Zendesk en texte et en métadonnées. Si le texte est plus grand que la taille de morceau spécifiée, les enregistrements sont divisés en plusieurs parties qui sont chargées individuellement dans la collection. Le fractionnement du texte (ou chunking) peut, par exemple, se produire dans le cas de tickets d'assistance ou d'articles de connaissance volumineux. En divisant le texte, vous pouvez vous assurer que les recherches donnent toujours des résultats utiles.
Prenons une taille de chunk de 1 000 tokens et des champs de texte tels que le corps, le titre, la description et le sujet, car ils seront présents dans les données que nous recevrons de Zendesk.
- Incorporation - L'utilisation de modèles d'apprentissage automatique transforme les morceaux de texte produits par la partie traitement en incrustations vectorielles que vous pouvez ensuite rechercher pour la similarité sémantique. Pour créer les encastrements, vous devez fournir la clé de l'API OpenAI. Airbyte enverra chaque morceau à OpenAI et ajoutera le vecteur résultant aux entités chargées dans votre cluster Milvus.
- Indexation - Une fois que vous avez vectorisé les morceaux, vous pouvez les charger dans la base de données. Pour ce faire, insérez les informations que vous avez obtenues lors de la configuration de votre cluster et de votre collection dans le cluster Milvus. En cliquant sur "Test and save", vous vérifierez que tout est en ordre (informations d'identification valides, collection existante et ayant la même dimension vectorielle que l'intégration configurée, etc.)
Configurer le flux de synchronisation des flux
La dernière étape avant que les données ne soient prêtes à circuler consiste à sélectionner les "flux" à synchroniser. Un flux est une collection d'enregistrements dans la source. Comme Zendesk prend en charge un grand nombre de flux qui ne sont pas pertinents pour notre cas d'utilisation, sélectionnons uniquement les "tickets" et les "articles" et désactivons tous les autres pour économiser de la bande passante et nous assurer que seules les informations pertinentes apparaîtront dans les recherches :
Vous pouvez sélectionner les champs à extraire de la source en cliquant sur le nom du flux. Le mode de synchronisation "Incrémental | Append + Deduped" signifie que les exécutions de connexion ultérieures maintiennent la synchronisation entre Zendesk et Milvus tout en transférant un minimum de données (uniquement les articles et les tickets qui ont changé depuis la dernière exécution).Dès que la connexion est établie, Airbyte commence à synchroniser les données. Cela peut prendre quelques minutes pour apparaître dans votre collection Milvus.
Si vous sélectionnez une fréquence de réplication, Airbyte s'exécutera régulièrement pour tenir votre collection Milvus à jour des modifications apportées aux articles Zendesk et des problèmes nouvellement créés.
Flux de vérification
Vous pouvez vérifier dans l'interface utilisateur du cluster Milvus comment les données sont structurées dans la collection en naviguant vers l'aire de jeu et en exécutant une requête "Query Data" avec un filtre défini sur "_ab_stream == \"tickets"".
Comme vous pouvez le voir dans la vue Résultat, chaque enregistrement provenant de Zendesk est stocké en tant qu'entité distincte dans Milvus avec toutes les métadonnées spécifiées. Le bloc de texte sur lequel l'incorporation est basée est affiché comme la propriété "text" - il s'agit du texte qui a été incorporé à l'aide d'OpenAI et sur lequel nous allons effectuer une recherche.Construire l'application Streamlit pour interroger la collection
Nos données sont prêtes - nous devons maintenant créer l'application qui les utilisera. Dans ce cas, l'application sera un simple formulaire d'assistance permettant aux utilisateurs de soumettre des cas d'assistance. Lorsque l'utilisateur clique sur "Envoyer", nous faisons deux choses :
- rechercher des tickets similaires soumis par des utilisateurs de la même organisation
- Rechercher des articles basés sur la connaissance qui pourraient être pertinents pour l'utilisateur.
Dans les deux cas, nous exploiterons la recherche sémantique à l'aide d'encastrements OpenAI. Pour ce faire, la description du problème saisi par l'utilisateur est également intégrée et utilisée pour extraire des entités similaires du cluster Milvus. S'il y a des résultats pertinents, ils sont affichés sous le formulaire.
Configuration de l'environnement de l'interface utilisateur
Vous aurez besoin d'une installation locale de Python car nous utiliserons Streamlit pour mettre en œuvre l'application.
Commencez par installer localement Streamlit, la bibliothèque client Milvus et la bibliothèque client OpenAI :
pip install streamlit pymilvus openai
Pour rendre un formulaire de support de base, créez un fichier python basic_support_form.py
:
import streamlit as st
with st.form("my_form"):
st.write("Submit a support case")
text_val = st.text_area("Describe your problem")
submitted = st.form_submit_button("Submit")
if submitted:
# TODO check for related support cases and articles
st.write("Submitted!")
Pour exécuter votre application, utilisez Streamlit run :
streamlit run basic_support_form.py
Cela rendra un formulaire de base :
Le code de cet exemple est également disponible sur GitHub.Mise en place d'un service de requête backend
Ensuite, vérifions s'il existe des tickets ouverts qui pourraient être pertinents. Pour ce faire, nous intégrons le texte que l'utilisateur a saisi à l'aide d'OpenAI, puis nous effectuons une recherche de similarité dans notre collection, en filtrant les tickets encore ouverts. S'il y en a un avec une très faible distance entre le ticket fourni et le ticket existant, nous le faisons savoir à l'utilisateur et ne le soumettons pas :
import streamlit as st
import os
import pymilvus
import openai
with st.form("my_form"):
st.write("Submit a support case")
text_val = st.text_area("Describe your problem?")
submitted = st.form_submit_button("Submit")
if submitted:
import os
import pymilvus
import openai
org_id = 360033549136 # TODO Load from customer login data
pymilvus.connections.connect(uri=os.environ["MILVUS_URL"], token=os.environ["MILVUS_TOKEN"])
collection = pymilvus.Collection("zendesk")
embedding = openai.Embedding.create(input=text_val, model="text-embedding-ada-002")['data'][0]['embedding']
results = collection.search(data=[embedding], anns_field="vector", param={}, limit=2, output_fields=["_id", "subject", "description"], expr=f'status == "new" and organization_id == {org_id}')
st.write(results[0])
if len(results[0]) > 0 and results[0].distances[0] < 0.35:
matching_ticket = results[0][0].entity
st.write(f"This case seems very similar to {matching_ticket.get('subject')} (id #{matching_ticket.get('_id')}). Make sure it has not been submitted before")
else:
st.write("Submitted!")
Plusieurs choses se produisent ici :
- La connexion au cluster Milvus est établie.
- Le service OpenAI est utilisé pour générer une intégration de la description saisie par l'utilisateur.
- Une recherche de similarité est effectuée, en filtrant les résultats par le statut du ticket et l'identifiant de l'organisation (car seuls les tickets ouverts de la même organisation sont pertinents).
- S'il y a des résultats et que la distance entre les vecteurs d'intégration du ticket existant et du texte nouvellement saisi est inférieure à un certain seuil, ce fait est signalé.
Pour exécuter la nouvelle application, vous devez d'abord définir les variables d'environnement pour OpenAI et Milvus :
export MILVUS_TOKEN=...
export MILVUS_URL=https://...
export OPENAI_API_KEY=sk-...
streamlit run app.py
Lorsque vous essayez de soumettre un ticket qui existe déjà, voici à quoi ressemblera le résultat :
Le code de cet exemple se trouve également sur GitHub.Afficher des informations plus pertinentes
Comme vous pouvez le voir dans la sortie de débogage verte cachée dans la version finale, deux tickets correspondent à notre recherche (dans le statut nouveau, de l'organisation actuelle, et proche du vecteur d'intégration). Cependant, le premier (pertinent) est mieux classé que le second (non pertinent dans cette situation), ce qui se traduit par une valeur de distance plus faible. Cette relation est capturée dans les vecteurs d'intégration sans faire correspondre directement les mots, comme dans une recherche en texte intégral classique.
Pour conclure, affichons des informations utiles après la soumission du ticket afin de donner à l'utilisateur autant d'informations pertinentes que possible.
Pour ce faire, nous allons effectuer une deuxième recherche après l'envoi du ticket pour récupérer les articles de la base de connaissances les plus pertinents :
......
else:
# TODO Actually send out the ticket
st.write("Submitted!")
article_results = collection.search(data=[embedding], anns_field="vector", param={}, limit=5, output_fields=["title", "html_url"], expr=f'_ab_stream == "articles"')
st.write(article_results[0])
if len(article_results[0]) > 0:
st.write("We also found some articles that might help you:")
for hit in article_results[0]:
if hit.distance < 0.362:
st.write(f"* [{hit.entity.get('title')}]({hit.entity.get('html_url')})")
S'il n'y a pas de ticket d'assistance ouvert avec un score de similarité élevé, le nouveau ticket est soumis et les articles de la base de connaissances pertinents sont affichés ci-dessous :
Le code de cet exemple peut également être trouvé sur Github.Conclusion
Bien que l'interface présentée ici ne soit pas un formulaire d'assistance réel mais un exemple pour illustrer le cas d'utilisation, la combinaison d'Airbyte et de Milvus est très puissante - elle permet de charger facilement du texte à partir d'une grande variété de sources (des bases de données comme Postgres aux API comme Zendesk ou GitHub en passant par des sources entièrement personnalisées construites à l'aide du SDK d'Airbyte ou du constructeur de connecteurs visuels) et de l'indexer sous forme intégrée dans Milvus, un puissant moteur de recherche vectoriel capable de s'adapter à d'énormes quantités de données.
Airbyte et Milvus sont des logiciels libres et entièrement gratuits à utiliser sur votre infrastructure, avec des offres en nuage pour décharger les opérations si vous le souhaitez.
Au-delà du cas d'utilisation classique de la recherche sémantique illustré dans cet article, la configuration générale peut également être utilisée pour construire un robot de conversation répondant à des questions en utilisant la méthode RAG (Retrieval Augmented Generation), des systèmes de recommandation, ou aider à rendre la publicité plus pertinente et plus efficace.