Published on 2023-04-21 10:00 by Vitor Sousa
RAG with LlamaIndex, Elasticsearch and Llama3
Check the repo: 🔗 RAG, Llama3, and Elasticsearch project repository.
Goal
Implement a Q&A experience utilizing the Retrieval Augmented Generation (RAG) technique, with Elasticsearch as the vector database. For this implementation, we will employ LlamaIndex for indexing and retrieval, alongside a locally hosted Llama3.
- Local Setup: We run the
llama3
locally using Ollama. - Data Ingestion:
- We load the JSON file into ElasticsearchStore, a vector store backed by Elasticsearch.
- During this process, we generate embeddings with the locally running
llama3
model. These embeddings, along with the actual conversation texts, are stored in the LlamaIndex Elasticsearch vector store.
- Configuration:
- We set up a LlamaIndex IngestionPipeline, integrating it with the local LLM, specifically the Mistral model via Ollama.
- Query Execution:
- When a query, such as “Give me a summary of Billing Inquiries and Corrections conversations.” is submitted, Elasticsearch performs a semantic search to fetch relevant conversations.
- These conversations, together with the original query, are fed into the locally running LLM to generate a response.
Steps
Create project
Ollama
ollama run llama3
Elasticsearch
- Get Elasticsearch up and running
docker run -p 9200:9200 -d --name elasticsearch \
-e "discovery.type=single-node" \
-e "xpack.security.enabled=false" \
-e "xpack.security.http.ssl.enabled=false" \
-e "xpack.license.self_generated.type=trial" \
docker.elastic.co/elasticsearch/elasticsearch:8.13.0
I set up the Elasticsearch in docker, but is also possible by creating a cloud deployment check it here.
Application
- Install & Initialize Poetry
cd your-project-directory
poetry init
- Add Dependencies
poetry add llama-index
poetry add llama-index-embeddings-ollama
poetry add llama-index-llms-ollama
poetry add llama-index-vector-stores-elasticsearch
poetry add sentence-transformers
poetry add streamlit
poetry add asyncio
- Download a JSON conversations example
- It could be another, just make sure we adapt the code given your keys.
Ingestion Pipeline
- Elasticsearch client setup in the
index.py
file
es_client = AsyncElasticsearch("http://localhost:9200")
es_vector_store = ElasticsearchStore(
index_name="calls",
vector_field='conversation_vector',
text_field='conversation',
es_client=es_client
# You can pass the CLOUD ID and API KEY on the cloud approach
)
- Create the documents Ingestion Logic in the
index.py
import json
from llama_index.core import Document
from elasticsearch import AsyncElasticsearch
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.ingestion import IngestionPipeline
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.vector_stores.elasticsearch import ElasticsearchStore
def get_documents_from_json(json_data) -> list:
"""
This function receives a JSON object (Python dictionary) and returns a list of documents.
"""
documents = [Document(text=item["conversation"], metadata={"conversation_id": item["conversation_id"]})
for item in json_data]
return documents
def ingest_documents(documents):
"""
This function ingests the documents into Elasticsearch
"""
ollama_embedding = OllamaEmbedding("llama3")
pipeline = IngestionPipeline(transformations=[SentenceSplitter(chunk_size=350, chunk_overlap=50), ollama_embedding],
vector_store=es_vector_store)
pipeline.run(documents=documents)
print("Pipeline run completed")
- LLamaIndex
IngestPipeline
Querying
- Setup the query engine in
query.py
from llama_index.core import VectorStoreIndex, QueryBundle, Settings
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.llms.ollama import Ollama
from index import es_vector_store
def setup_query_engine(model_name="llama3"):
local_llm = Ollama(model=model_name)
Settings.embed_model = OllamaEmbedding(model_name)
index = VectorStoreIndex.from_vector_store(es_vector_store)
return index.as_query_engine(local_llm, similarity_top_k=10)
- Create the execution function to be called by the interface in
query.py
def execute_query(query_engine, query_text):
bundle = QueryBundle(query_text, embedding=Settings.embed_model.get_query_embedding(query_text))
result = query_engine.query(bundle)
return result
Interface
- Create the Elasticsearch client
es_client = AsyncElasticsearch("http://localhost:9200")
- Function to check if the embeddings are already present in the Elasticsearch
async def check_embeddings():
try:
index_info = await es_client.indices.exists(index="calls")
if index_info:
doc_count = await es_client.count(index="calls",
body={"query": {"exists": {"field": "conversation_vector"}}})
return doc_count['count'] > 0
return False
except Exception as e:
st.error(f"Failed to connect or query Elasticsearch: {e}")
return False
- Check if the embeddings are already present in the Elasticsearch, if not show the file uploader.
embeddings_present = asyncio.run(check_embeddings())
if not embeddings_present:
json_file = st.file_uploader("Upload a JSON file with conversations", type=['json'])
json_data = json.load(json_file)
if json_file is not None:
documents = get_documents_from_json(json_data)
ingest_documents(documents)
else:
st.success("Embeddings are already present in Elasticsearch.")
- Instantiate query engine
query_engine = setup_query_engine(model_name)
- Read query and execute
query_text = st.text_input("")
result = execute_query(query_engine, query_text)
Execution
- Upload the JSON file and Ingest documents
- Check if the documents were ingested
curl -k http://localhost:9200/calls/_search
Should get something like:
- Write and execute the query
Next Steps
- 🎯 Make the model dynamic
- 🎯 Make the number of documents adaptable in the interface
- 🎯 Create logic to clean the Elasticsearch and dynamically change the documents of reference
- 🎯 Be able to process file types other than JSON
Written by Vitor Sousa
← Back to portfolio