ML-Ops for RAG end to end pipeline
Basic Steps
- Data Collection and Preprocessing: Automate data collection, preprocessing, and feature extraction using Airflow.
- Model Training: Train models using Kubeflow pipelines, tracking experiments with MLFlow.
- Model Storage and Versioning: Store models in MLFlow Model Registry and artifacts in S3.
- Deployment: Deploy models using Seldon Core, expose APIs via AWS API Gateway.
- Monitoring and Logging: Monitor performance with Prometheus and Grafana, log data with ELK Stack.
- CI/CD: Automate deployments with GitHub Actions, manage infrastructure with Terraform.
- RAG Specific: Use Pinecone for vector searches, fine-tune LLMs with Pytorch, integrate with Haystack for the RAG pipeline.
ExtractiveQAPipeline:
- Extractive QA pipeline extracts answers directly from the context provided in the documents. It typically involves a retriever to fetch relevant documents and a reader to pinpoint the exact answer span within these documents.
- Other options:
- Hybrid QA Pipeline: Combines both extractive and generative approaches
- Generative QA Pipeline: Uses generative models (like GPT-3)
- Extractive Reader: This component extracts precise answers from documents by identifying spans of text that directly answer the query. It uses models like BERT or RoBERTa trained for extractive question answering tasks.
- Generative Reader: This component generates answers in natural language, which can be more flexible and produce more nuanced responses. It typically uses models like GPT-3, T5, or other generative transformers trained for QA tasks.
Dense Passage Retriever (DPR):
- DPR uses neural embeddings to represent both questions and documents.
- Both questions and documents are encoded into dense vectors in the same latent spacer
- Other options:
- BM25 (Best Matching 25)
- TF-IDF (Term Frequency-Inverse Document Frequency)
Haystack:
- Haystack is an open-source framework for building end-to-end search and question-answering (QA) systems.
- Components : document stores, retrievers, and readers
- Other options:
- Hugging Face Transformers: Using Hugging Face’s pre-trained models directly
- OpenAI GPT Models: Directly using OpenAI’s API for question answering tasks
MLOps stack
- Data Ingestion
- Apache Airflow, AWS S3
- Data Storage
- S3 Bucket
- Data Processing and Feature Engineering
- Pandas/Spark
- Model Training and Validation
- MLFlow
- Model Versioning and Registry
- MLFlow Model Registry
- Model Serving
- Seldon Core
- Monitoring and Logging
- ELK Stack
- CI/CD
- Jenkins
- RAG components
- Pinecone for Vector Database
- Haystack for Document Retrieval and Question Answering
- LangChain for Chain of Components
Dummy code for reference
Indexed processed data into pinecone
import pinecone
from sentence_transformers import SentenceTransformer
# Initialize Pinecone
pinecone.init(api_key='pinecoe-api-key', environment='us-west1-')
index_name = 'construction-docs'
if index_name not in pinecone.list_indexes():
pinecone.create_index(index_name, dimension=768, metric='cosine')
index = pinecone.Index(index_name)
# Initialize Sentence Transformer model
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')
# Load processed data from S3
s3_client.download_file('bucketname', 'processed/processed_data.csv', '/tmp/processed_data.csv')
processed_data = pd.read_csv('/tmp/processed_data.csv')
# Convert data to documents and embeddings
documents = processed_data['column_name'].astype(str).tolist()
embeddings = model.encode(documents).tolist()
# Prepare documents for Pinecone
pinecone_docs = [{'id': str(i), 'values': embedding, 'metadata': {'text': doc}} for i, (embedding, doc) in enumerate(zip(embeddings, documents))]
# Index documents in Pinecone
index.upsert(pinecone_docs)
Integrate RAG Pipeline with Preprocessing, Model Training
# Data processing and saving processed data to S3 : DUMMY CODE
data = pd.read_csv('s3://bucket/data/data.csv')
processed_data = data[data['example_column'] > 1]
processed_data.to_csv('s3://bucket/processed/processed_data.csv', index=False)
# Train and log model with MLFlow
def train_model():
data = pd.read_csv('s3://bucket/processed/processed_data.csv')
X = data.drop('target', axis=1)
y = data['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
with mlflow.start_run():
model = RandomForestClassifier()
model.fit(X_train, y_train)
accuracy = model.score(X_test, y_test)
mlflow.log_metric("accuracy", accuracy)
mlflow.sklearn.log_model(model, "model")
train_model()
RAG pipeline
- Load your custom-trained model from MLflow.
- Create a custom reader by subclassing BaseReader in Haystack.
- Initialize Pinecone Document Store using Haystack.
- Use DensePassageRetriever for retrieving documents.
- Create an ExtractiveQAPipeline using the custom reader and retriever.
- Ask questions to the pipeline and get answers.
class CustomReader(BaseReader):
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
def predict(self, query, documents, top_k=10):
results = []
for doc in documents:
inputs = self.tokenizer.encode_plus(query, doc.content, return_tensors="pt")
with torch.no_grad():
outputs = self.model(**inputs)
start_scores, end_scores = outputs.start_logits, outputs.end_logits
# Get the most likely beginning and end of answer with the argmax of the score
answer_start = torch.argmax(start_scores)
answer_end = torch.argmax(end_scores) + 1
answer = self.tokenizer.convert_tokens_to_string(self.tokenizer.convert_ids_to_tokens(inputs["input_ids"][0][answer_start:answer_end]))
results.append({"answer": answer, "score": start_scores.max().item()})
results = sorted(results, key=lambda x: x['score'], reverse=True)[:top_k]
return results
from haystack import Document
from haystack.pipeline import ExtractiveQAPipeline
from haystack.retriever.dense import DensePassageRetriever
from haystack.document_store.pinecone import PineconeDocumentStore
# Initialize Pinecone Document Store
document_store = PineconeDocumentStore(api_key='your-pinecone-api-key', index_name='construction-docs')
# Initialize Dense Passage Retriever
retriever = DensePassageRetriever(document_store=document_store)
#Custom reader to load the trained model
reader = CustomReader(model=model, tokenizer=tokenizer)
pipeline = ExtractiveQAPipeline(reader, retriever)
""""
Hybrid pipeline:
# Create a Hybrid Pipeline
pipeline = Pipeline()
pipeline.add_node(component=retriever, name="DenseRetriever", inputs=["Query"])
pipeline.add_node(component=reader, name="ExtractiveReader", inputs=["DenseRetriever"])
pipeline.add_node(component=rag_generator, name="GenerativeReader", inputs=["DenseRetriever"])
""""
# Ask a Question
prediction = pipeline.run(query="What is the best learning practice?", top_k_retriever=10, top_k_reader=5)
print(prediction)
FLAST REST API Implementation
from flask import Flask, request, jsonify
import mlflow.pytorch
import torch
from transformers import AutoTokenizer
import pinecone
from sentence_transformers import SentenceTransformer
from haystack.document_store.pinecone import PineconeDocumentStore
from haystack.retriever.dense import DensePassageRetriever
from haystack.pipeline import Pipeline
app = Flask(__name__)
# Load model from MLflow
model_uri = "models:/rag_model/production"
model = mlflow.pytorch.load_model(model_uri)
tokenizer = AutoTokenizer.from_pretrained("model-tokenizer")
class CustomQAReader:
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
def predict(self, query, documents, top_k=10):
results = []
for doc in documents:
inputs = self.tokenizer.encode_plus(query, doc.content, return_tensors="pt")
with torch.no_grad():
outputs = self.model(**inputs)
start_scores, end_scores = outputs.start_logits, outputs.end_logits
# Extractive answer
answer_start = torch.argmax(start_scores)
answer_end = torch.argmax(end_scores) + 1
extractive_answer = self.tokenizer.convert_tokens_to_string(self.tokenizer.convert_ids_to_tokens(inputs["input_ids"][0][answer_start:answer_end]))
results.append({"extractive_answer": extractive_answer, "score": start_scores.max().item()})
results = sorted(results, key=lambda x: x['score'], reverse=True)[:top_k]
return results
# Initialize Pinecone
pinecone.init(api_key='pinecone-api-key', environment='us-west1-gcp')
document_store = PineconeDocumentStore(api_key='pinecone-api-key', index_name='docs')
# Initialize Dense Passage Retriever
retriever = DensePassageRetriever(document_store=document_store)
# Initialize Custom Reader
reader = CustomQAReader(model=model, tokenizer=tokenizer)
# Create a Hybrid Pipeline
pipeline = Pipeline()
pipeline.add_node(component=retriever, name="DenseRetriever", inputs=["Query"])
pipeline.add_node(component=reader, name="QAReader", inputs=["DenseRetriever"])
@app.route('/query', methods=['POST'])
def query():
data = request.get_json()
user_query = data['query']
result = pipeline.run(query=user_query, params={"DenseRetriever": {"top_k": 10}, "QAReader": {"top_k": 3}})
return jsonify(result)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)