Back to portfolio
Published on Apr 21, 2023 by Vitor Sousa

RAG with LlamaIndex, Elasticsearch and Llama3

GitHub badge linking to the RAG with Llama3 and Elasticsearch repositoryCheck the repo: 🔗 RAG, Llama3, and Elasticsearch project repository.

Goal

Implement a Q&A experience utilizing the Retrieval Augmented Generation (RAG) technique, with Elasticsearch as the vector database. For this implementation, we will employ LlamaIndex for indexing and retrieval, alongside a locally hosted Llama3.

  • Local Setup: We run the llama3 locally using Ollama.
  • Data Ingestion:
    • We load the JSON file into ElasticsearchStore, a vector store backed by Elasticsearch.
    • During this process, we generate embeddings with the locally running llama3 model. These embeddings, along with the actual conversation texts, are stored in the LlamaIndex Elasticsearch vector store.
  • Configuration:
    • We set up a LlamaIndex IngestionPipeline, integrating it with the local LLM, specifically the Mistral model via Ollama.
  • Query Execution:
    • When a query, such as “Give me a summary of Billing Inquiries and Corrections conversations.” is submitted, Elasticsearch performs a semantic search to fetch relevant conversations.
  • These conversations, together with the original query, are fed into the locally running LLM to generate a response. Architecture diagram of the RAG workflow combining LlamaIndex, Elasticsearch, and Llama3

Steps

Create project

Ollama

  1. Download and install Ollama
    1. Run llama3 model (by default is the 8B)
    ollama run llama3

Elasticsearch

  1. Get Elasticsearch up and running
docker run -p 9200:9200 -d --name elasticsearch \
  -e "discovery.type=single-node" \
  -e "xpack.security.enabled=false" \
  -e "xpack.security.http.ssl.enabled=false" \
  -e "xpack.license.self_generated.type=trial" \
  docker.elastic.co/elasticsearch/elasticsearch:8.13.0

I set up the Elasticsearch in docker, but is also possible by creating a cloud deployment check it here.

Application

  1. Install & Initialize Poetry
cd your-project-directory
poetry init
  1. Add Dependencies
poetry add llama-index 
poetry add llama-index-embeddings-ollama 
poetry add llama-index-llms-ollama 
poetry add llama-index-vector-stores-elasticsearch 
poetry add sentence-transformers 
poetry add streamlit 
poetry add asyncio
  1. Download a JSON conversations example
    • It could be another, just make sure we adapt the code given your keys.
Ingestion Pipeline
  1. Elasticsearch client setup in the index.py file
es_client = AsyncElasticsearch("http://localhost:9200")

es_vector_store = ElasticsearchStore(
    index_name="calls",
    vector_field='conversation_vector',
    text_field='conversation',
    es_client=es_client  
    # You can pass the CLOUD ID and API KEY on the cloud approach
)
  1. Create the documents Ingestion Logic in the index.py
import json
from llama_index.core import Document
from elasticsearch import AsyncElasticsearch
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.ingestion import IngestionPipeline
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.vector_stores.elasticsearch import ElasticsearchStore
def get_documents_from_json(json_data) -> list:
    """
    This function receives a JSON object (Python dictionary) and returns a list of documents.
    """
    documents = [Document(text=item["conversation"], metadata={"conversation_id": item["conversation_id"]})
                 for item in json_data]
    return documents

def ingest_documents(documents):
    """
    This function ingests the documents into Elasticsearch
    """
    ollama_embedding = OllamaEmbedding("llama3")
    pipeline = IngestionPipeline(transformations=[SentenceSplitter(chunk_size=350, chunk_overlap=50), ollama_embedding],
                                 vector_store=es_vector_store)
    pipeline.run(documents=documents)
    print("Pipeline run completed")
  • LLamaIndex IngestPipeline
Querying
  1. Setup the query engine in query.py
from llama_index.core import VectorStoreIndex, QueryBundle, Settings
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.llms.ollama import Ollama
from index import es_vector_store


def setup_query_engine(model_name="llama3"):
    local_llm = Ollama(model=model_name)
    Settings.embed_model = OllamaEmbedding(model_name)
    index = VectorStoreIndex.from_vector_store(es_vector_store)
    return index.as_query_engine(local_llm, similarity_top_k=10)
  1. Create the execution function to be called by the interface in query.py
def execute_query(query_engine, query_text):
    bundle = QueryBundle(query_text, embedding=Settings.embed_model.get_query_embedding(query_text))
    result = query_engine.query(bundle)
    return result

Interface

  1. Create the Elasticsearch client
es_client = AsyncElasticsearch("http://localhost:9200")
  1. Function to check if the embeddings are already present in the Elasticsearch
async def check_embeddings():
    try:
        index_info = await es_client.indices.exists(index="calls")
        if index_info:
            doc_count = await es_client.count(index="calls",
                                              body={"query": {"exists": {"field": "conversation_vector"}}})
            return doc_count['count'] > 0
        return False
    except Exception as e:
        st.error(f"Failed to connect or query Elasticsearch: {e}")
        return False
  1. Check if the embeddings are already present in the Elasticsearch, if not show the file uploader.
    embeddings_present = asyncio.run(check_embeddings())

    if not embeddings_present:
        json_file = st.file_uploader("Upload a JSON file with conversations", type=['json'])
        json_data = json.load(json_file)
        if json_file is not None:
            documents = get_documents_from_json(json_data)
            ingest_documents(documents)
    else:
        st.success("Embeddings are already present in Elasticsearch.")
  1. Instantiate query engine
 query_engine = setup_query_engine(model_name)
  1. Read query and execute
query_text = st.text_input("")
result = execute_query(query_engine, query_text)

Execution

  1. Upload the JSON file and Ingest documents

llama

  1. Check if the documents were ingested
curl -k http://localhost:9200/calls/_search

Should get something like: elastic

  1. Write and execute the query llama

Next Steps

  • 🎯 Make the model dynamic
  • 🎯 Make the number of documents adaptable in the interface
  • 🎯 Create logic to clean the Elasticsearch and dynamically change the documents of reference
  • 🎯 Be able to process file types other than JSON

More Projects

Dashboard showcasing large language model workflows running on Apple MLX.

Large Language Models with MLX 🚀

A Python-based project that runs Large Language Models (LLM) applications on Apple Silicon in real-time thanks to Apple MLX.

Read project
Explore full portfolio