Vitor Sousa

← Back to portfolio

Published on 2023-04-21 10:00 by Vitor Sousa

RAG with LlamaIndex, Elasticsearch and Llama3

pythonCheck the repo: 🔗 RAG, Llama3, and Elasticsearch project repository.

Goal

Implement a Q&A experience utilizing the Retrieval Augmented Generation (RAG) technique, with Elasticsearch as the vector database. For this implementation, we will employ LlamaIndex for indexing and retrieval, alongside a locally hosted Llama3.

Steps

Create project

Ollama

  1. Download and install Ollama
    1. Run llama3 model (by default is the 8B)
    ollama run llama3

Elasticsearch

  1. Get Elasticsearch up and running
docker run -p 9200:9200 -d --name elasticsearch \
  -e "discovery.type=single-node" \
  -e "xpack.security.enabled=false" \
  -e "xpack.security.http.ssl.enabled=false" \
  -e "xpack.license.self_generated.type=trial" \
  docker.elastic.co/elasticsearch/elasticsearch:8.13.0

I set up the Elasticsearch in docker, but is also possible by creating a cloud deployment check it here.

Application

  1. Install & Initialize Poetry
cd your-project-directory
poetry init
  1. Add Dependencies
poetry add llama-index 
poetry add llama-index-embeddings-ollama 
poetry add llama-index-llms-ollama 
poetry add llama-index-vector-stores-elasticsearch 
poetry add sentence-transformers 
poetry add streamlit 
poetry add asyncio
  1. Download a JSON conversations example
    • It could be another, just make sure we adapt the code given your keys.
Ingestion Pipeline
  1. Elasticsearch client setup in the index.py file
es_client = AsyncElasticsearch("http://localhost:9200")

es_vector_store = ElasticsearchStore(
    index_name="calls",
    vector_field='conversation_vector',
    text_field='conversation',
    es_client=es_client  
    # You can pass the CLOUD ID and API KEY on the cloud approach
)
  1. Create the documents Ingestion Logic in the index.py
import json
from llama_index.core import Document
from elasticsearch import AsyncElasticsearch
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.ingestion import IngestionPipeline
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.vector_stores.elasticsearch import ElasticsearchStore
def get_documents_from_json(json_data) -> list:
    """
    This function receives a JSON object (Python dictionary) and returns a list of documents.
    """
    documents = [Document(text=item["conversation"], metadata={"conversation_id": item["conversation_id"]})
                 for item in json_data]
    return documents

def ingest_documents(documents):
    """
    This function ingests the documents into Elasticsearch
    """
    ollama_embedding = OllamaEmbedding("llama3")
    pipeline = IngestionPipeline(transformations=[SentenceSplitter(chunk_size=350, chunk_overlap=50), ollama_embedding],
                                 vector_store=es_vector_store)
    pipeline.run(documents=documents)
    print("Pipeline run completed")
Querying
  1. Setup the query engine in query.py
from llama_index.core import VectorStoreIndex, QueryBundle, Settings
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.llms.ollama import Ollama
from index import es_vector_store


def setup_query_engine(model_name="llama3"):
    local_llm = Ollama(model=model_name)
    Settings.embed_model = OllamaEmbedding(model_name)
    index = VectorStoreIndex.from_vector_store(es_vector_store)
    return index.as_query_engine(local_llm, similarity_top_k=10)
  1. Create the execution function to be called by the interface in query.py
def execute_query(query_engine, query_text):
    bundle = QueryBundle(query_text, embedding=Settings.embed_model.get_query_embedding(query_text))
    result = query_engine.query(bundle)
    return result

Interface

  1. Create the Elasticsearch client
es_client = AsyncElasticsearch("http://localhost:9200")
  1. Function to check if the embeddings are already present in the Elasticsearch
async def check_embeddings():
    try:
        index_info = await es_client.indices.exists(index="calls")
        if index_info:
            doc_count = await es_client.count(index="calls",
                                              body={"query": {"exists": {"field": "conversation_vector"}}})
            return doc_count['count'] > 0
        return False
    except Exception as e:
        st.error(f"Failed to connect or query Elasticsearch: {e}")
        return False
  1. Check if the embeddings are already present in the Elasticsearch, if not show the file uploader.
    embeddings_present = asyncio.run(check_embeddings())

    if not embeddings_present:
        json_file = st.file_uploader("Upload a JSON file with conversations", type=['json'])
        json_data = json.load(json_file)
        if json_file is not None:
            documents = get_documents_from_json(json_data)
            ingest_documents(documents)
    else:
        st.success("Embeddings are already present in Elasticsearch.")
  1. Instantiate query engine
 query_engine = setup_query_engine(model_name)
  1. Read query and execute
query_text = st.text_input("")
result = execute_query(query_engine, query_text)

Execution

  1. Upload the JSON file and Ingest documents

llama

  1. Check if the documents were ingested
curl -k http://localhost:9200/calls/_search

Should get something like: elastic

  1. Write and execute the query llama

Next Steps


Written by Vitor Sousa

← Back to portfolio