milvus-logo
LFAI
Home
  • Integraciones
    • Fuentes de datos

Airbyte: Infraestructura de movimiento de datos de código abierto

Airbyte es una infraestructura de movimiento de datos de código abierto para crear canalizaciones de datos de extracción y carga (EL). Está diseñada para ofrecer versatilidad, escalabilidad y facilidad de uso. El catálogo de conectores de Airbyte incluye más de 350 conectores preconfigurados. Estos conectores pueden utilizarse para empezar a replicar datos de un origen a un destino en cuestión de minutos.

Principales componentes de Airbyte

1. Catálogo de conectores

  • Más de 350 conectores preconfigurados: El catálogo de conectores de Airbyte viene "listo para usar" con más de 350 conectores pre-construidos. Estos conectores se pueden utilizar para empezar a replicar datos de un origen a un destino en sólo unos minutos.
  • Constructor de conectores sin código: Puede ampliar fácilmente la funcionalidad de Airbyte para soportar sus casos de uso personalizados a través de herramientas como el No-Code Connector Builder.

2. La plataforma

La plataforma de Airbyte proporciona todos los servicios horizontales necesarios para configurar y escalar las operaciones de movimiento de datos, disponibles como gestionados en la nube o autogestionados.

3. La Interfaz de Usuario

Airbyte cuenta con una interfaz de usuario, PyAirbyte (biblioteca Python), API y Terraform Provider para integrarse con sus herramientas y enfoque preferidos para la gestión de infraestructuras.

Con la capacidad de Airbyte, los usuarios pueden integrar fuentes de datos en el clúster Milvus para la búsqueda de similitudes.

Antes de empezar

Necesitará

  • Cuenta de Zendesk (u otra fuente de datos desde la que desee sincronizar datos)
  • Cuenta Airbyte o instancia local
  • Clave API de OpenAI
  • Clúster Milvus
  • Python 3.10 instalado localmente

Configurar el clúster Milvus

Si ya ha desplegado un cluster K8s para producción, puede saltarse este paso y proceder directamente a desplegar Milvus Operator. Si no, puede seguir los pasos para desplegar un cluster Milvus con Milvus Operator.

Las entidades individuales (en nuestro caso, los tickets de soporte y los artículos de la base de conocimientos) se almacenan en una "colección" - una vez configurado el clúster, deberá crear una colección. Elija un nombre adecuado y establezca la Dimensión en 1536 para que coincida con la dimensionalidad vectorial generada por el servicio de incrustación de OpenAI.

Tras la creación, registra el endpoint y la información de autenticación.

Configurar la conexión en Airbyte

Nuestra base de datos está lista, ¡vamos a mover algunos datos! Para ello, necesitamos configurar una conexión en Airbyte. Regístrese para obtener una cuenta en la nube Airbyte en cloud.airbyte.com o inicie una instancia local como se describe en la documentación.

Configurar la fuente

Una vez que su instancia esté funcionando, tenemos que configurar la conexión - haga clic en "Nueva conexión" y elija el conector "Zendesk Support" como la fuente. Después de hacer clic en el botón "Probar y guardar", Airbyte comprobará si se puede establecer la conexión.

En la nube de Airbyte, puede autenticarse fácilmente haciendo clic en el botón Autenticar. Si utiliza una instancia local de Airbyte, siga las instrucciones indicadas en la página de documentación.

Configurar el destino

Si todo funciona correctamente, el siguiente paso es configurar el destino al que mover los datos. Aquí, elija el conector "Milvus".

El conector Milvus hace tres cosas

  • Chunking y Formateo - Divide los registros de Zendesk en texto y metadatos. Si el texto es mayor que el tamaño de trozo especificado, los registros se dividen en varias partes que se cargan en la colección individualmente. La división de texto (o chunking) puede ocurrir, por ejemplo, en el caso de tickets de soporte o artículos de conocimiento de gran tamaño. Al dividir el texto, puede asegurarse de que las búsquedas siempre produzcan resultados útiles.

Vamos a usar un tamaño de trozo de 1000 tokens y campos de texto de cuerpo, título, descripción y asunto, ya que estos estarán presentes en los datos que recibiremos de Zendesk.

  • Incrustación - El uso de modelos de aprendizaje automático transforma los trozos de texto producidos por la parte de procesamiento en incrustaciones vectoriales que luego puedes buscar por similitud semántica. Para crear las incrustaciones, debes proporcionar la clave de la API de OpenAI. Airbyte enviará cada trozo a OpenAI y añadirá el vector resultante a las entidades cargadas en su clúster Milvus.
  • Indexación - Una vez que haya vectorizado los chunks, puede cargarlos en la base de datos. Para ello, inserta la información que obtuviste al configurar tu cluster y colección en Milvus cluster.
    Haciendo clic en "Probar y guardar" comprobará si todo está alineado correctamente (credenciales válidas, la colección existe y tiene la misma dimensionalidad vectorial que la incrustación configurada, etc.)

Configurar el flujo de sincronización

El último paso antes de que los datos estén listos para fluir es seleccionar qué "flujos" sincronizar. Un flujo es una colección de registros en la fuente. Como Zendesk soporta un gran número de flujos que no son relevantes para nuestro caso de uso, vamos a seleccionar sólo "tickets" y "artículos" y desactivar todos los demás para ahorrar ancho de banda y asegurarnos de que sólo la información relevante aparecerá en las búsquedas:

Puede seleccionar qué campos extraer de la fuente haciendo clic en el nombre del flujo. El modo de sincronización "Incremental | Append + Deduped" significa que las subsiguientes ejecuciones de la conexión mantienen a Zendesk y Milvus sincronizados mientras transfieren un mínimo de datos (solo los artículos y tickets que han cambiado desde la última ejecución).

En cuanto se establezca la conexión, Airbyte comenzará a sincronizar los datos. Puede tardar unos minutos en aparecer en su colección Milvus.

Si selecciona una frecuencia de replicación, Airbyte se ejecutará regularmente para mantener su colección de Milvus actualizada con los cambios en los artículos de Zendesk y los problemas recién creados.

Comprobar el flujo

Puede comprobar en la interfaz de usuario del clúster Milvus cómo están estructurados los datos en la colección navegando a la zona de juegos y ejecutando una consulta "Query Data" con un filtro establecido en "_ab_stream == \"tickets\"".

Como se puede ver en la vista Resultado, cada registro procedente de Zendesk se almacena como entidades separadas en Milvus con todos los metadatos especificados. El trozo de texto en el que se basa la incrustación se muestra como la propiedad "text" - este es el texto que se incrustó utilizando OpenAI y será lo que buscaremos.

Crear la aplicación Streamlit para consultar la colección

Nuestros datos están listos - ahora tenemos que construir la aplicación para utilizarlos. En este caso, la aplicación será un simple formulario de soporte para que los usuarios envíen casos de soporte. Cuando el usuario pulse enviar, haremos dos cosas:

  • Buscar tickets similares enviados por usuarios de la misma organización.
  • Buscar artículos basados en conocimientos que puedan ser relevantes para el usuario.

En ambos casos, aprovecharemos la búsqueda semántica mediante incrustaciones de OpenAI. Para ello, la descripción del problema introducida por el usuario también se incrusta y se utiliza para recuperar entidades similares del clúster Milvus. Si hay resultados relevantes, se muestran debajo del formulario.

Configurar el entorno de la interfaz de usuario

Necesitará una instalación local de Python, ya que utilizaremos Streamlit para implementar la aplicación.

En primer lugar, instale Streamlit, la biblioteca cliente Milvus, y la biblioteca cliente OpenAI localmente:

pip install streamlit pymilvus openai

Para renderizar un formulario de soporte básico, crea un archivo python basic_support_form.py:

import streamlit as st

with st.form("my_form"):
    st.write("Submit a support case")
    text_val = st.text_area("Describe your problem")

    submitted = st.form_submit_button("Submit")
    if submitted:
        # TODO check for related support cases and articles
        st.write("Submitted!")

Para ejecutar su aplicación, utilice Streamlit run:

streamlit run basic_support_form.py

Esto renderizará un formulario básico:

El código de este ejemplo también se puede encontrar en GitHub.

Configurar el servicio de consulta backend

A continuación, vamos a comprobar si existen tickets abiertos que puedan ser relevantes. Para hacer esto, incrustamos el texto que el usuario ingresó usando OpenAI, luego hicimos una búsqueda de similitud en nuestra colección, filtrando por tickets aún abiertos. Si hay uno con una distancia muy baja entre el ticket suministrado y el ticket existente, se lo hacemos saber al usuario y no lo enviamos:

import streamlit as st
import os
import pymilvus
import openai


with st.form("my_form"):
    st.write("Submit a support case")
    text_val = st.text_area("Describe your problem?")

    submitted = st.form_submit_button("Submit")
    if submitted:
        import os
        import pymilvus
        import openai

        org_id = 360033549136 # TODO Load from customer login data

        pymilvus.connections.connect(uri=os.environ["MILVUS_URL"], token=os.environ["MILVUS_TOKEN"])
        collection = pymilvus.Collection("zendesk")

        embedding = openai.Embedding.create(input=text_val, model="text-embedding-ada-002")['data'][0]['embedding']

        results = collection.search(data=[embedding], anns_field="vector", param={}, limit=2, output_fields=["_id", "subject", "description"], expr=f'status == "new" and organization_id == {org_id}')

        st.write(results[0])
        if len(results[0]) > 0 and results[0].distances[0] < 0.35:
            matching_ticket = results[0][0].entity
            st.write(f"This case seems very similar to {matching_ticket.get('subject')} (id #{matching_ticket.get('_id')}). Make sure it has not been submitted before")
        else:
            st.write("Submitted!")
            

Aquí ocurren varias cosas:

  • Se establece la conexión con el clúster Milvus.
  • Se utiliza el servicio OpenAI para generar una incrustación de la descripción introducida por el usuario.
  • Se realiza una búsqueda de similitudes, filtrando los resultados por el estado del ticket y el id de la organización (ya que sólo son relevantes los tickets abiertos de la misma organización).
  • Si hay resultados y la distancia entre los vectores de incrustación del ticket existente y el texto recién introducido está por debajo de un determinado umbral, llama la atención sobre este hecho.

Para ejecutar la nueva aplicación, es necesario configurar primero las variables de entorno para OpenAI y Milvus:

export MILVUS_TOKEN=...
export MILVUS_URL=https://...
export OPENAI_API_KEY=sk-...

streamlit run app.py

Al intentar enviar un ticket que ya existe, el resultado será el siguiente:

El código de este ejemplo también se puede encontrar en GitHub.

Mostrar más información relevante

Como puede ver en la salida de depuración verde oculta en la versión final, dos tickets coincidían con nuestra búsqueda (en estado nuevo, de la organización actual y cerca del vector de incrustación). Sin embargo, el primero (relevante) se clasificó mejor que el segundo (irrelevante en esta situación), lo que se refleja en el menor valor de distancia. Esta relación queda reflejada en los vectores de incrustación sin que las palabras coincidan directamente, como en una búsqueda normal de texto completo.

Para terminar, vamos a mostrar información útil después de que se envíe el ticket para dar al usuario tanta información relevante por adelantado como sea posible.

Para ello, vamos a realizar una segunda búsqueda después de enviar el ticket para obtener los artículos de la base de conocimientos que más coincidan:

   ......
   
        else:
            # TODO Actually send out the ticket
            st.write("Submitted!")
            article_results = collection.search(data=[embedding], anns_field="vector", param={}, limit=5, output_fields=["title", "html_url"], expr=f'_ab_stream == "articles"')
            st.write(article_results[0])
            if len(article_results[0]) > 0:
                st.write("We also found some articles that might help you:")
                for hit in article_results[0]:
                    if hit.distance < 0.362:
                        st.write(f"* [{hit.entity.get('title')}]({hit.entity.get('html_url')})")

Si no hay ningún ticket de soporte abierto con una alta puntuación de similitud, el nuevo ticket se envía y los artículos de conocimiento relevantes se muestran a continuación:

El código de este ejemplo también se puede encontrar en Github.

Conclusión

Aunque la interfaz de usuario que se muestra aquí no es un formulario de soporte real, sino un ejemplo para ilustrar el caso de uso, la combinación de Airbyte y Milvus es muy potente: facilita la carga de texto desde una amplia variedad de fuentes (desde bases de datos como Postgres, pasando por API como Zendesk o GitHub, hasta fuentes completamente personalizadas creadas mediante el SDK de Airbyte o el creador de conectores visuales) y su indexación de forma incrustada en Milvus, un potente motor de búsqueda vectorial capaz de escalar a enormes cantidades de datos.

Airbyte y Milvus son de código abierto y de uso completamente gratuito en su infraestructura, con ofertas en la nube para descargar las operaciones si lo desea.

Más allá del caso de uso clásico de búsqueda semántica ilustrado en este artículo, la configuración general también puede utilizarse para construir un bot de chat que responda a preguntas utilizando el método RAG (Retrieval Augmented Generation), sistemas de recomendación o para ayudar a que la publicidad sea más relevante y eficiente.