import os
# import dotenv

# %reload_ext dotenv
# %dotenv

file_path = "../pre-requisites.ipynb"
default_file_path = True
if(not os.path.exists(file_path)):
    default_file_path = False
    file_path = "./pre-requisites.ipynb"

%run -i {file_path}
import os
import json
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents.models import VectorizedQuery
from azure.search.documents.indexes.models import (
    SearchIndex,
    ScoringProfile,
    SearchFieldDataType,
    SimpleField,
    SearchableField,
    SearchField,
    SemanticConfiguration,
    SemanticField,
    VectorSearchProfile,
    HnswAlgorithmConfiguration,
    VectorSearch,
    HnswParameters,
    SemanticPrioritizedFields,
    SemanticSearch,
)
from azure.search.documents.indexes import SearchIndexClient
import os.path

Create Index Function#

def create_index(search_index_name, vector_search_dimensions=1536):
    client = SearchIndexClient(service_endpoint, credential)

    # 1. Define the fields
    fields = [
        SimpleField(
            name="chunkId",
            type=SearchFieldDataType.String,
            sortable=True,
            filterable=True,
            key=True,
        ),
        SimpleField(
            name="source",
            type=SearchFieldDataType.String,
            sortable=True,
            filterable=True,
        ),
        SearchableField(name="chunkContent", type=SearchFieldDataType.String),
        SearchField(
            name="chunkContentVector",
            type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
            searchable=True,
            # the dimension of the embedded vector
            vector_search_dimensions=vector_search_dimensions,
            vector_search_profile_name="my-vector-config",
        ),
    ]

    # 2. Configure the vector search configuration
    vector_search = VectorSearch(
        profiles=[
            VectorSearchProfile(
                name="my-vector-config",
                algorithm_configuration_name="my-algorithms-config",
            )
        ],
        algorithms=[
            # Contains configuration options specific to the hnsw approximate nearest neighbors  algorithm used during indexing and querying
            HnswAlgorithmConfiguration(
                name="my-algorithms-config",
                kind="hnsw",
                # https://learn.microsoft.com/en-us/python/api/azure-search-documents/azure.search.documents.indexes.models.hnswparameters?view=azure-python-preview#variables
                parameters=HnswParameters(
                    m=4,
                    # The size of the dynamic list containing the nearest neighbors, which is used during index time.
                    # Increasing this parameter may improve index quality, at the expense of increased indexing time.
                    ef_construction=400,
                    # The size of the dynamic list containing the nearest neighbors, which is used during search time.
                    # Increasing this parameter may improve search results, at the expense of slower search.
                    ef_search=500,
                    # The similarity metric to use for vector comparisons.
                    # Known values are: "cosine", "euclidean", and "dotProduct"
                    metric="cosine",
                ),
            )
        ],
    )

    index = SearchIndex(
        name=search_index_name,
        fields=fields,
        vector_search=vector_search,
    )

    result = client.create_or_update_index(index)
    print(f"Index: '{result.name}' created or updated")

Create Chunking Function#

import glob
from langchain_community.document_loaders import UnstructuredFileLoader
from langchain.text_splitter import MarkdownTextSplitter
import json

Create chunks using markdown text splitter

import json
from langchain.text_splitter import MarkdownHeaderTextSplitter


def create_md_header_chunks_and_save_to_file(documents, path_to_output) -> list:
    try:
        if os.path.exists(path_to_output):
            print(f"Chunks already created at: {path_to_output} ")
            return
        lengths = {}
        all_chunks = []
        chunk_id = 0
        # tqdm.tqdm(
        headers_to_split_on = [
            ("#", "Header 1"),
            ("##", "Header 2"),
            ("###", "Header 3"),
        ]

        markdown_splitter = MarkdownHeaderTextSplitter(
            headers_to_split_on=headers_to_split_on, strip_headers=False)

        for document in documents:
            current_chunks_text_list = markdown_splitter.split_text(
                document[0].page_content)
            source = document[0].metadata["source"]

            for i, chunk in enumerate(current_chunks_text_list):

                current_chunk_dict = {
                    "chunkId": f"chunk{chunk_id}_{i}",
                    "chunkContent": chunk.page_content,
                    "source": source,
                }
                all_chunks.append(current_chunk_dict)
            chunk_id += 1
        with open(path_to_output, "w") as f:
            json.dump(all_chunks, f)
    except Exception as e:
        print(f"Error creating chunks: {e}")
    return all_chunks
# path_to_output = f"./output/pre-generated/chunking/md-header-text-splitter-engineering-mlops.json"

# create_md_header_chunks_and_save_to_file(documents, path_to_output)
%%capture --no-display
def load_documents_from_folder(path, totalNumberOfDocuments=200) -> list[str]:
    print("Loading documents...")
    markdown_documents = []
    i = 0
    for file in glob.glob(path, recursive=True):
        loader = UnstructuredFileLoader(file)
        document = loader.load()
        markdown_documents.append(document)
        if i == totalNumberOfDocuments:
            return markdown_documents
        i += 1
    return markdown_documents


def create_chunks_and_save_to_file(path_to_output, totalNumberOfDocuments=200, chunk_size=300, chunk_overlap=30) -> list:
    if (os.path.exists(path_to_output)):
        print(f"Chunks already created at: {path_to_output} ")
        return

    documents = load_documents_from_folder(
        "..\data\docs\code-with-engineering\**\*.md", totalNumberOfDocuments)

    print("Creating chunks...")
    markdown_splitter = MarkdownTextSplitter.from_tiktoken_encoder(
        chunk_size=chunk_size, chunk_overlap=chunk_overlap
    )
    lengths = {}
    all_chunks = []
    chunk_id = 0
    for document in documents:
        current_chunks_text_list = markdown_splitter.split_text(
            document[0].page_content
        )  # output = ["content chunk1", "content chunk2", ...]

        for i, chunk in enumerate(
            current_chunks_text_list
        ):  # (0, "content chunk1"), (1, "content chunk2"), ...
            current_chunk_dict = {
                "chunkId": f"chunk{chunk_id}_{i}",
                "chunkContent": chunk,
                "source": document[0].metadata["source"],
            }
            all_chunks.append(current_chunk_dict)

        chunk_id += 1

        n_chunks = len(current_chunks_text_list)
        # lengths = {[Number of chunks]: [number of documents with that number of chunks]}
        if n_chunks not in lengths:
            lengths[n_chunks] = 1
        else:
            lengths[n_chunks] += 1

    with open(path_to_output, "w") as f:
        json.dump(all_chunks, f)
    print(f"Chunks created: ", lengths)
    return all_chunks

Create Embeddings Functions#

Create embeddings using AOI:

import time
import requests


def oai_query_embedding(
    query,
    endpoint=azure_aoai_endpoint,
    api_key=azure_openai_key,
    api_version="2023-07-01-preview",
    embedding_model_deployment=azure_openai_embedding_deployment,
):
    """
    Query the OpenAI Embedding model to get the embeddings for the given query.

    Args:
    query (str): The query for which to get the embeddings.
    endpoint (str): The endpoint for the OpenAI service.
    api_key (str): The API key for the OpenAI service.
    api_version (str): The API version for the OpenAI service.
    embedding_model_deployment (str): The deployment for the OpenAI embedding model.
    Returns:
    list: The embeddings for the given query.
    """
    # If input has more than 8,000 words, shrink it to max
    if len(query) > 8000:
        print("Shrinked!!1")
        query = query[:8000]
    # print(len(query))
    request_url = f"{endpoint}/openai/deployments/{embedding_model_deployment}/embeddings?api-version={api_version}"
    headers = {"Content-Type": "application/json", "api-key": api_key}
    request_payload = {"input": query}
    embedding_response = requests.post(
        request_url, json=request_payload, headers=headers, timeout=None
    )
    # embedding_response = embed_input(query)
    if embedding_response.status_code == 200:
        #     time.sleep(2.5)
        #     embedding_response = embed_input(query)

        data_values = embedding_response.json()["data"]
        embeddings_vectors = [data_value["embedding"]
                              for data_value in data_values]
        return embeddings_vectors[0]
    else:
        print("Failed to get embedding: ", embedding_response.json())
        print("Length: ", len(query))
        return []
        # print("Retried")
        # raise Exception(
        #     f"failed to get embedding: {embedding_response.json()}")

Create embeddings using OpenAI

Create embeddings using AOI in batch:

Took 48s

import json


def generate_embeddings_for_chunks_in_batch(path_to_chunked_documents, path_to_output_file):
    """
    Generate embeddings for chunked data
    Args:
    path_to_chunked_documents: str: path to the input file
    path_to_output_file: str: path to the output file
    """
    if os.path.exists(path_to_output_file):
        print(f"Embeddings were already created for chunked data {path_to_chunked_documents} at: {path_to_chunked_documents}")
        return
    try:
        with open(path_to_chunked_documents, "r", encoding="utf-8") as file:
            input_data = json.load(file)
            batch_size = 32
            num_chunks = len(input_data)
            for i in range(0, num_chunks, batch_size):
                batch_chunks = input_data[i:i + batch_size]
                batch_chunks_content = [chunk["chunkContent"]
                                        for chunk in batch_chunks]
                batch_embeddings = oai_query_embedding(
                    batch_chunks_content, batch=True)
                for j, chunk in enumerate(batch_chunks):
                    # print("j : ", j)
                    chunk["chunkContentVector"] = batch_embeddings[j]
                    # content = chunk["chunkContent"]
                    # content_embeddings = oai_query_embedding(content)
                    # chunk["chunkContentVector"] = content_embeddings

        with open(path_to_output_file, "w") as f:
            json.dump(input_data, f)
    except Exception as e:
        print(f"Failed to generate embeddings for chunks: {e}")

Create embeddings using AOI and save to file:

Took 16 mins

def generate_embeddings_for_chunks_and_save_to_file(path_to_chunks_file, path_to_output):
    try:
        if (os.path.exists(path_to_output)):
            print(
                f"Embeddings were already created for chunked data at: {path_to_chunks_file} ")
            return
        # i = 0
        with open(path_to_chunks_file, "r", encoding="utf-8") as file:
            input_data = json.load(file)

            for chunk in input_data:
                content = chunk["chunkContent"]
                # print(f"Length: {len(content)}")
                # print(i)
                # print(chunk["chunkId"])

                content_emebddings = oai_query_embedding(content)
                chunk["chunkContentVector"] = content_emebddings
                # i = i+1
        print(f"Created {len(input_data)} chunks")
        print(f"Example of one chunk: {input_data[1]}")

        with open(path_to_output, "w") as f:
            json.dump(input_data, f)
            print(f"Saved embeddings to: {path_to_output}")

    except Exception as e:
        print(f"Failed to generate embeddings: {e}")
from sentence_transformers import SentenceTransformer
import os


def intfloat_e5_small_v2_query_embedding(chunk, model=SentenceTransformer("intfloat/e5-small-v2")):
    embedded_input = model.encode(
        chunk, normalize_embeddings=True
    )  # Note that the type is a ndarray.
    return (
        embedded_input.tolist()
    )  # We need to reshape the array to be a list of floats
/Users/raouf/handsontest/ai-hands-on-lab/.venv/lib/python3.9/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from .autonotebook import tqdm as notebook_tqdm

Upload data to the Index Function#

def upload_data(file_path, search_index_name):
    try:
        with open(file_path, "r") as file:
            documents = json.load(file)

        search_client = SearchClient(
            endpoint=service_endpoint,
            index_name=search_index_name,
            credential=credential,
        )
        for i in range(0, len(documents)):
            try:
                search_client.upload_documents(documents[i])
            except Exception as e:
                print(f"Error uploading document {i}: {e}")

        print(
            f"Uploaded {len(documents)} documents to Index: {search_index_name}")
    except Exception as e:
        print(f"Error uploading documents: {e}")

Generate embeddings using open source model for fixed size chunking#

Took 22 minutes.

import json
from sentence_transformers import SentenceTransformer
import os


def embed_chunk(chunk, model=SentenceTransformer("intfloat/e5-small-v2")):
    embedded_input = model.encode(
        chunk, normalize_embeddings=True
    )  # Note that the type is a ndarray.
    return (
        embedded_input.tolist()
    )  # We need to reshape the array to be a list of floats


def generate_embeddings_with_intfloat_e5_small_v2(
    path_to_input_file, path_to_output_file
):
    if os.path.exists(path_to_output_file):
        print(
            f"Embeddings were already created for chunked data {path_to_input_file} at: {path_to_input_file} ")
        return
    try:
        model = SentenceTransformer("intfloat/e5-small-v2")
        with open(path_to_input_file, "r", encoding="utf-8") as file:
            input_data = json.load(file)
            for chunk in input_data:
                content = chunk["chunkContent"]
                content_emebddings = embed_chunk(content, model)
                chunk["chunkContentVector"] = content_emebddings

        with open(path_to_output_file, "w") as f:
            json.dump(input_data, f)
    except Exception as e:
        print(f"Failed to generate embeddings for chunks: {e}")


# e5_small_v2_prefix = "fixed-size-chunks-180-30-engineering-mlops-e5-small-v2"
# path_to_output_file = f"../output/pre-generated/embeddings/{
#     e5_small_v2_prefix}.json"
# pregenerated_fixed_size_chunks = '../output/pre-generated/chunking/fixed-size-chunks-engineering-mlops-180-30.json'
# generate_embeddings_with_intfloat_e5_small_v2(
#     path_to_input_file=pregenerated_fixed_size_chunks,
#     path_to_output_file=path_to_output_file,
# )

Generate embeddings using open source model for semantic chunking#

Took 12 minutes.

# path_to_chunked_documents = "../output/pre-generated/chunking/semantic-chunks-engineering-mlops.json"
# e5_small_v2_prefix = "semantic-chunking-engineering-mlops-e5-small-v2"
# path_to_output_file = f"../output/pre-generated/embeddings/{
#     e5_small_v2_prefix}.json"
# generate_embeddings_with_intfloat_e5_small_v2(
#     path_to_input_file=path_to_chunked_documents,
#     path_to_output_file=path_to_output_file,
# )

Search documents Function#

def search_documents(search_index_name, input, embedding_function):
    search_client = SearchClient(
        service_endpoint, search_index_name, credential=credential
    )
    query_embeddings = embedding_function(input)
    vector_query = VectorizedQuery(
        vector=query_embeddings, k_nearest_neighbors=3, fields="chunkContentVector"
    )

    results = search_client.search(
        search_text=None,
        vector_queries=[vector_query],
        select=["chunkContent", "chunkId", "source", "chunkContentVector"],
    )
    # print_results(results)

    documents = []
    for document in results:
        item = {}
        item["chunkContent"] = document["chunkContent"]
        item["source"] = os.path.normpath(document["source"])
        item["chunkId"] = document["chunkId"]
        item["score"] = document['@search.score']
        item["chunkContentVector"] = document["chunkContentVector"]
        documents.append(item)

    return documents

Create a prompt#

def create_prompt(query, documents):
    system_prompt = f"""

    Instructions:

    "You are an AI assistant that helps users answer questions given a specific context.
    You will be given a context ("chunkContent") in Retrieved Documents and will be asked a question (User Question) based on that context.
    Your answer should be as precise as possible and should only come from the context.
    Please add citation after each sentence when possible in a form "(Source: source+chunkId),
    where both 'source' and 'chunkId' are taken from the Retrieved Documents."
    """

    user_prompt = f"""
    ## Retrieve Documents:
    {documents}

    ## User Question
    {query}
    """

    final_message = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt + "\nEND OF CONTEXT"},
    ]

    return final_message

Call LLM#

from openai import AzureOpenAI


def call_llm(messages: list[dict]):
    client = AzureOpenAI(
        api_key=azure_openai_key,
        api_version="2023-07-01-preview",
        azure_endpoint=azure_aoai_endpoint
    )

    response = client.chat.completions.create(
        model=azure_openai_chat_deployment, messages=messages)
    return response.choices[0].message.content