ML-Ops for RAG end to end pipeline


Basic Steps

  1. Data Collection and Preprocessing: Automate data collection, preprocessing, and feature extraction using Airflow.
  2. Model Training: Train models using Kubeflow pipelines, tracking experiments with MLFlow.
  3. Model Storage and Versioning: Store models in MLFlow Model Registry and artifacts in S3.
  4. Deployment: Deploy models using Seldon Core, expose APIs via AWS API Gateway.
  5. Monitoring and Logging: Monitor performance with Prometheus and Grafana, log data with ELK Stack.
  6. CI/CD: Automate deployments with GitHub Actions, manage infrastructure with Terraform.
  7. RAG Specific: Use Pinecone for vector searches, fine-tune LLMs with Pytorch, integrate with Haystack for the RAG pipeline.

ExtractiveQAPipeline:

  • Extractive QA pipeline extracts answers directly from the context provided in the documents. It typically involves a retriever to fetch relevant documents and a reader to pinpoint the exact answer span within these documents.
  • Other options:
    • Hybrid QA Pipeline: Combines both extractive and generative approaches
    • Generative QA Pipeline: Uses generative models (like GPT-3)
  • Extractive Reader: This component extracts precise answers from documents by identifying spans of text that directly answer the query. It uses models like BERT or RoBERTa trained for extractive question answering tasks.
  • Generative Reader: This component generates answers in natural language, which can be more flexible and produce more nuanced responses. It typically uses models like GPT-3, T5, or other generative transformers trained for QA tasks.

Dense Passage Retriever (DPR):

  • DPR uses neural embeddings to represent both questions and documents.
  • Both questions and documents are encoded into dense vectors in the same latent spacer
  • Other options:
    • BM25 (Best Matching 25)
    • TF-IDF (Term Frequency-Inverse Document Frequency)

Haystack:

  • Haystack is an open-source framework for building end-to-end search and question-answering (QA) systems.
  • Components : document stores, retrievers, and readers
  • Other options:
    • Hugging Face Transformers: Using Hugging Face’s pre-trained models directly
    • OpenAI GPT Models: Directly using OpenAI’s API for question answering tasks

MLOps stack

  1. Data Ingestion
    • Apache Airflow, AWS S3
  2. Data Storage
    • S3 Bucket
  3. Data Processing and Feature Engineering
    • Pandas/Spark
  4. Model Training and Validation
    • MLFlow
  5. Model Versioning and Registry
    • MLFlow Model Registry
  6. Model Serving
    • Seldon Core
  7. Monitoring and Logging
    • ELK Stack
  8. CI/CD
    • Jenkins
  9. RAG components
    • Pinecone for Vector Database
    • Haystack for Document Retrieval and Question Answering
    • LangChain for Chain of Components

Dummy code for reference

Indexed processed data into pinecone

import pinecone
from sentence_transformers import SentenceTransformer

# Initialize Pinecone
pinecone.init(api_key='pinecoe-api-key', environment='us-west1-')
index_name = 'construction-docs'
if index_name not in pinecone.list_indexes():
    pinecone.create_index(index_name, dimension=768, metric='cosine')

index = pinecone.Index(index_name)

# Initialize Sentence Transformer model
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')

# Load processed data from S3
s3_client.download_file('bucketname', 'processed/processed_data.csv', '/tmp/processed_data.csv')
processed_data = pd.read_csv('/tmp/processed_data.csv')

# Convert data to documents and embeddings
documents = processed_data['column_name'].astype(str).tolist()
embeddings = model.encode(documents).tolist()

# Prepare documents for Pinecone
pinecone_docs = [{'id': str(i), 'values': embedding, 'metadata': {'text': doc}} for i, (embedding, doc) in enumerate(zip(embeddings, documents))]

# Index documents in Pinecone
index.upsert(pinecone_docs)

Integrate RAG Pipeline with Preprocessing, Model Training

# Data processing and saving processed data to S3 : DUMMY CODE
data = pd.read_csv('s3://bucket/data/data.csv')
processed_data = data[data['example_column'] > 1]
processed_data.to_csv('s3://bucket/processed/processed_data.csv', index=False)

# Train and log model with MLFlow
def train_model():
    data = pd.read_csv('s3://bucket/processed/processed_data.csv')
    X = data.drop('target', axis=1)
    y = data['target']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    with mlflow.start_run():
        model = RandomForestClassifier()
        model.fit(X_train, y_train)

        accuracy = model.score(X_test, y_test)
        mlflow.log_metric("accuracy", accuracy)
        mlflow.sklearn.log_model(model, "model")

train_model()

RAG pipeline

  • Load your custom-trained model from MLflow.
  • Create a custom reader by subclassing BaseReader in Haystack.
  • Initialize Pinecone Document Store using Haystack.
  • Use DensePassageRetriever for retrieving documents.
  • Create an ExtractiveQAPipeline using the custom reader and retriever.
  • Ask questions to the pipeline and get answers.
class CustomReader(BaseReader):
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer

    def predict(self, query, documents, top_k=10):
        results = []
        for doc in documents:
            inputs = self.tokenizer.encode_plus(query, doc.content, return_tensors="pt")
            with torch.no_grad():
                outputs = self.model(**inputs)
            start_scores, end_scores = outputs.start_logits, outputs.end_logits

            # Get the most likely beginning and end of answer with the argmax of the score
            answer_start = torch.argmax(start_scores)
            answer_end = torch.argmax(end_scores) + 1
            answer = self.tokenizer.convert_tokens_to_string(self.tokenizer.convert_ids_to_tokens(inputs["input_ids"][0][answer_start:answer_end]))

            results.append({"answer": answer, "score": start_scores.max().item()})

        results = sorted(results, key=lambda x: x['score'], reverse=True)[:top_k]
        return results


from haystack import Document
from haystack.pipeline import ExtractiveQAPipeline
from haystack.retriever.dense import DensePassageRetriever
from haystack.document_store.pinecone import PineconeDocumentStore

# Initialize Pinecone Document Store
document_store = PineconeDocumentStore(api_key='your-pinecone-api-key', index_name='construction-docs')

# Initialize Dense Passage Retriever
retriever = DensePassageRetriever(document_store=document_store)

#Custom reader to load the trained model
reader = CustomReader(model=model, tokenizer=tokenizer)

pipeline = ExtractiveQAPipeline(reader, retriever)

""""
Hybrid pipeline:
# Create a Hybrid Pipeline
pipeline = Pipeline()
pipeline.add_node(component=retriever, name="DenseRetriever", inputs=["Query"])
pipeline.add_node(component=reader, name="ExtractiveReader", inputs=["DenseRetriever"])
pipeline.add_node(component=rag_generator, name="GenerativeReader", inputs=["DenseRetriever"])
""""

# Ask a Question
prediction = pipeline.run(query="What is the best learning practice?", top_k_retriever=10, top_k_reader=5)
print(prediction)

FLAST REST API Implementation

from flask import Flask, request, jsonify
import mlflow.pytorch
import torch
from transformers import AutoTokenizer
import pinecone
from sentence_transformers import SentenceTransformer
from haystack.document_store.pinecone import PineconeDocumentStore
from haystack.retriever.dense import DensePassageRetriever
from haystack.pipeline import Pipeline

app = Flask(__name__)

# Load model from MLflow
model_uri = "models:/rag_model/production"
model = mlflow.pytorch.load_model(model_uri)
tokenizer = AutoTokenizer.from_pretrained("model-tokenizer")

class CustomQAReader:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer

    def predict(self, query, documents, top_k=10):
        results = []
        for doc in documents:
            inputs = self.tokenizer.encode_plus(query, doc.content, return_tensors="pt")
            with torch.no_grad():
                outputs = self.model(**inputs)
            start_scores, end_scores = outputs.start_logits, outputs.end_logits

            # Extractive answer
            answer_start = torch.argmax(start_scores)
            answer_end = torch.argmax(end_scores) + 1
            extractive_answer = self.tokenizer.convert_tokens_to_string(self.tokenizer.convert_ids_to_tokens(inputs["input_ids"][0][answer_start:answer_end]))

            results.append({"extractive_answer": extractive_answer, "score": start_scores.max().item()})

        results = sorted(results, key=lambda x: x['score'], reverse=True)[:top_k]
        return results

# Initialize Pinecone
pinecone.init(api_key='pinecone-api-key', environment='us-west1-gcp')
document_store = PineconeDocumentStore(api_key='pinecone-api-key', index_name='docs')

# Initialize Dense Passage Retriever
retriever = DensePassageRetriever(document_store=document_store)

# Initialize Custom Reader
reader = CustomQAReader(model=model, tokenizer=tokenizer)

# Create a Hybrid Pipeline
pipeline = Pipeline()
pipeline.add_node(component=retriever, name="DenseRetriever", inputs=["Query"])
pipeline.add_node(component=reader, name="QAReader", inputs=["DenseRetriever"])

@app.route('/query', methods=['POST'])
def query():
    data = request.get_json()
    user_query = data['query']
    result = pipeline.run(query=user_query, params={"DenseRetriever": {"top_k": 10}, "QAReader": {"top_k": 3}})
    return jsonify(result)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)


.ipynb Notebook