milvus-logo
LFAI
Home
  • Integrações
    • Fontes de dados

Airbyte: Infraestrutura de movimentação de dados de código aberto

A Airbyte é uma infraestrutura de movimentação de dados de código aberto para a criação de pipelines de dados de extração e carregamento (EL). Foi concebida para ser versátil, escalável e fácil de utilizar. O catálogo de conectores da Airbyte vem "pronto para uso" com mais de 350 conectores pré-construídos. Estes conectores podem ser utilizados para começar a replicar dados de uma fonte para um destino em apenas alguns minutos.

Principais componentes do Airbyte

1. Catálogo de conectores

  • Mais de 350 conectores pré-construídos: O catálogo de conectores do Airbyte vem "out-of-the-box" com mais de 350 conectores pré-construídos. Estes conectores podem ser utilizados para começar a replicar dados de uma fonte para um destino em apenas alguns minutos.
  • Criador de conectores sem código: Pode facilmente alargar a funcionalidade da Airbyte para suportar os seus casos de utilização personalizados através de ferramentas como o No-Code Connector Builder.

2. A plataforma

A plataforma da Airbyte fornece todos os serviços horizontais necessários para configurar e escalar operações de movimentação de dados, disponíveis como geridos na nuvem ou autogeridos.

3. A interface de utilizador

A Airbyte possui uma interface de utilizador, PyAirbyte (biblioteca Python), API e Terraform Provider para integrar com as suas ferramentas preferidas e abordagem à gestão de infra-estruturas.

Com a capacidade da Airbyte, os utilizadores podem integrar fontes de dados no cluster Milvus para pesquisa de semelhanças.

Antes de começar

Você precisará de:

  • Conta do Zendesk (ou outra fonte de dados da qual deseja sincronizar os dados)
  • Conta do Airbyte ou instância local
  • Chave da API do OpenAI
  • Cluster do Milvus
  • Python 3.10 instalado localmente

Configurar o cluster do Milvus

Se já tiver implementado um cluster K8s para produção, pode saltar este passo e avançar diretamente para a implementação do Milvus Operator. Caso contrário, pode seguir os passos para implementar um cluster Milvus com o Milvus Operator.

As entidades individuais (no nosso caso, bilhetes de suporte e artigos da base de conhecimentos) são armazenadas numa "coleção" - depois de o seu cluster estar configurado, é necessário criar uma coleção. Escolha um nome adequado e defina a Dimensão para 1536 para corresponder à dimensionalidade do vetor gerado pelo serviço de incorporação do OpenAI.

Após a criação, registe o ponto final e as informações de autenticação.

Configurar a ligação no Airbyte

A nossa base de dados está pronta, vamos transferir alguns dados! Para o fazer, precisamos de configurar uma ligação na Airbyte. Inscreva-se numa conta Airbyte na nuvem em cloud.airbyte.com ou inicie uma instância local conforme descrito na documentação.

Configurar a fonte

Quando sua instância estiver em execução, precisamos configurar a conexão - clique em "Nova conexão" e escolha o conetor "Zendesk Support" como a origem. Depois de clicar no botão "Testar e salvar", o Airbyte verificará se a conexão pode ser estabelecida.

Na nuvem da Airbyte, pode autenticar-se facilmente clicando no botão Autenticar. Quando utilizar uma instância local da Airbyte, siga as instruções descritas na página de documentação.

Configurar o destino

Se tudo estiver a funcionar corretamente, o próximo passo é configurar o destino para onde mover os dados. Aqui, escolha o conetor "Milvus".

O conetor Milvus faz três coisas:

  • Separaçãoe formatação - Divide os registos do Zendesk em texto e metadados. Se o texto for maior do que o tamanho do bloco especificado, os registos são divididos em várias partes que são carregadas individualmente na coleção. A divisão do texto (ou chunking) pode ocorrer, por exemplo, no caso de tickets de suporte ou artigos de conhecimento grandes. Ao dividir o texto, pode garantir que as pesquisas produzem sempre resultados úteis.

Vamos usar um tamanho de bloco de 1000 tokens e campos de texto de corpo, título, descrição e assunto, pois eles estarão presentes nos dados que receberemos do Zendesk.

  • Incorporação - O uso de modelos de aprendizado de máquina transforma os blocos de texto produzidos pela parte de processamento em incorporações de vetor que podem ser pesquisadas para similaridade semântica. Para criar as incorporações, é necessário fornecer a chave da API do OpenAI. O Airbyte envia cada pedaço para o OpenAI e adiciona o vetor resultante às entidades carregadas no seu cluster Milvus.
  • Indexação - Depois de ter vectorizado os chunks, pode carregá-los na base de dados. Para o fazer, insira as informações que obteve aquando da configuração do seu cluster e da sua coleção no cluster Milvus.
    Clicando em "Testar e guardar", verificará se tudo está corretamente alinhado (credenciais válidas, a coleção existe e tem a mesma dimensionalidade vetorial que o embedding configurado, etc.).

Configurar o fluxo de sincronização de fluxo

O último passo antes de os dados estarem prontos para fluir é selecionar quais os "fluxos" a sincronizar. Um fluxo é uma coleção de registros na origem. Como o Zendesk suporta um grande número de fluxos que não são relevantes para o nosso caso de uso, vamos selecionar apenas "tickets" e "artigos" e desativar todos os outros para economizar largura de banda e garantir que apenas as informações relevantes apareçam nas pesquisas:

É possível selecionar os campos a extrair da fonte clicando no nome do fluxo. O modo de sincronização "Incremental | Append + Deduped" significa que as execuções de conexão subsequentes mantêm o Zendesk e o Milvus em sincronia enquanto transferem o mínimo de dados (apenas os artigos e tickets que foram alterados desde a última execução).

Assim que a conexão for configurada, o Airbyte começará a sincronizar os dados. Pode demorar alguns minutos a aparecer na sua coleção Milvus.

Se você selecionar uma frequência de replicação, o Airbyte será executado regularmente para manter sua coleção do Milvus atualizada com as alterações nos artigos do Zendesk e nos problemas recém-criados.

Fluxo de verificação

Você pode verificar na interface do usuário do cluster do Milvus como os dados estão estruturados na coleção navegando até o playground e executando uma consulta "Query Data" com um filtro definido como "_ab_stream == \"tickets\"".

Como você pode ver na exibição Resultado, cada registro proveniente do Zendesk é armazenado como entidades separadas no Milvus com todos os metadados especificados. O trecho de texto no qual a incorporação se baseia é mostrado como a propriedade "text" - este é o texto que foi incorporado usando o OpenAI e será o que pesquisaremos.

Criar a aplicação Streamlit para consultar a coleção

Os nossos dados estão prontos - agora precisamos de construir a aplicação para os utilizar. Neste caso, a aplicação será um simples formulário de apoio para os utilizadores submeterem casos de apoio. Quando o utilizador clicar em submeter, faremos duas coisas:

  • Procurar bilhetes semelhantes submetidos por utilizadores da mesma organização
  • Procurar artigos baseados no conhecimento que possam ser relevantes para o utilizador

Em ambos os casos, utilizaremos a pesquisa semântica usando os embeddings do OpenAI. Para tal, a descrição do problema que o utilizador introduziu também é incorporada e utilizada para obter entidades semelhantes do agrupamento Milvus. Se existirem resultados relevantes, estes são apresentados por baixo do formulário.

Configurar o ambiente da IU

É necessária uma instalação local do Python, uma vez que vamos utilizar o Streamlit para implementar a aplicação.

Primeiro, instale localmente o Streamlit, a biblioteca de clientes Milvus e a biblioteca de clientes OpenAI:

pip install streamlit pymilvus openai

Para apresentar um formulário de suporte básico, crie um ficheiro python basic_support_form.py:

import streamlit as st

with st.form("my_form"):
    st.write("Submit a support case")
    text_val = st.text_area("Describe your problem")

    submitted = st.form_submit_button("Submit")
    if submitted:
        # TODO check for related support cases and articles
        st.write("Submitted!")

Para executar a sua aplicação, utilize o Streamlit run:

streamlit run basic_support_form.py

Isto irá renderizar um formulário básico:

O código para este exemplo também pode ser encontrado no GitHub.

Configurar o serviço de consulta de backend

Em seguida, vamos verificar se há tíquetes abertos existentes que possam ser relevantes. Para isso, incorporamos o texto que o usuário digitou usando o OpenAI e, em seguida, fizemos uma pesquisa de similaridade em nossa coleção, filtrando os tíquetes ainda abertos. Se houver um com uma distância muito baixa entre o tíquete fornecido e o tíquete existente, informamos o usuário e não enviamos:

import streamlit as st
import os
import pymilvus
import openai


with st.form("my_form"):
    st.write("Submit a support case")
    text_val = st.text_area("Describe your problem?")

    submitted = st.form_submit_button("Submit")
    if submitted:
        import os
        import pymilvus
        import openai

        org_id = 360033549136 # TODO Load from customer login data

        pymilvus.connections.connect(uri=os.environ["MILVUS_URL"], token=os.environ["MILVUS_TOKEN"])
        collection = pymilvus.Collection("zendesk")

        embedding = openai.Embedding.create(input=text_val, model="text-embedding-ada-002")['data'][0]['embedding']

        results = collection.search(data=[embedding], anns_field="vector", param={}, limit=2, output_fields=["_id", "subject", "description"], expr=f'status == "new" and organization_id == {org_id}')

        st.write(results[0])
        if len(results[0]) > 0 and results[0].distances[0] < 0.35:
            matching_ticket = results[0][0].entity
            st.write(f"This case seems very similar to {matching_ticket.get('subject')} (id #{matching_ticket.get('_id')}). Make sure it has not been submitted before")
        else:
            st.write("Submitted!")
            

Estão a acontecer várias coisas aqui:

  • A ligação ao cluster Milvus é estabelecida.
  • O serviço OpenAI é utilizado para gerar uma incorporação da descrição que o utilizador introduziu.
  • É efectuada uma pesquisa de semelhanças, filtrando os resultados pelo estado do bilhete e pelo ID da organização (uma vez que apenas os bilhetes abertos da mesma organização são relevantes).
  • Se existirem resultados e a distância entre os vectores de incorporação do bilhete existente e o texto recentemente introduzido for inferior a um determinado limiar, este facto é assinalado.

Para executar a nova aplicação, é necessário definir primeiro as variáveis de ambiente para o OpenAI e o Milvus:

export MILVUS_TOKEN=...
export MILVUS_URL=https://...
export OPENAI_API_KEY=sk-...

streamlit run app.py

Ao tentar submeter um bilhete que já existe, o resultado será o seguinte:

O código para este exemplo também pode ser encontrado no GitHub.

Mostrar mais informações relevantes

Como você pode ver na saída de depuração verde oculta na versão final, dois tickets corresponderam à nossa pesquisa (no status novo, da organização atual e próximo ao vetor de incorporação). No entanto, o primeiro (relevante) teve uma classificação mais elevada do que o segundo (irrelevante nesta situação), o que se reflecte no valor de distância mais baixo. Esta relação é captada nos vectores de incorporação sem fazer corresponder diretamente as palavras, como numa pesquisa normal de texto integral.

Para concluir, vamos mostrar informações úteis após a submissão do ticket para dar ao utilizador o máximo possível de informações relevantes antecipadamente.

Para isso, faremos uma segunda pesquisa depois que o ticket for enviado para buscar os artigos da base de conhecimento com as melhores correspondências:

   ......
   
        else:
            # TODO Actually send out the ticket
            st.write("Submitted!")
            article_results = collection.search(data=[embedding], anns_field="vector", param={}, limit=5, output_fields=["title", "html_url"], expr=f'_ab_stream == "articles"')
            st.write(article_results[0])
            if len(article_results[0]) > 0:
                st.write("We also found some articles that might help you:")
                for hit in article_results[0]:
                    if hit.distance < 0.362:
                        st.write(f"* [{hit.entity.get('title')}]({hit.entity.get('html_url')})")

Se não houver nenhum ticket de suporte aberto com uma pontuação de similaridade alta, o novo ticket é enviado e os artigos de conhecimento relevantes são mostrados abaixo:

O código para este exemplo também pode ser encontrado no Github.

Conclusão

Embora a interface de utilizador aqui apresentada não seja um formulário de apoio real, mas um exemplo para ilustrar o caso de utilização, a combinação de Airbyte e Milvus é muito poderosa - facilita o carregamento de texto a partir de uma grande variedade de fontes (de bases de dados como Postgres sobre APIs como Zendesk ou GitHub até fontes completamente personalizadas construídas utilizando o SDK da Airbyte ou o construtor de conectores visuais) e indexa-o de forma incorporada no Milvus, um poderoso motor de pesquisa vetorial capaz de escalar para grandes quantidades de dados.

O Airbyte e o Milvus são de código aberto e totalmente gratuitos para utilização na sua infraestrutura, com ofertas de nuvem para descarregar operações, se desejado.

Para além do caso de utilização clássico da pesquisa semântica ilustrado neste artigo, a configuração geral também pode ser utilizada para criar um chatbot de resposta a perguntas utilizando o método RAG (Retrieval Augmented Generation), sistemas de recomendação ou para ajudar a tornar a publicidade mais relevante e eficiente.