import os
# import dotenv
# %reload_ext dotenv
# %dotenv
file_path = "../pre-requisites.ipynb"
default_file_path = True
if(not os.path.exists(file_path)):
default_file_path = False
file_path = "./pre-requisites.ipynb"
%run -i {file_path}
import os
import json
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents.models import VectorizedQuery
from azure.search.documents.indexes.models import (
SearchIndex,
ScoringProfile,
SearchFieldDataType,
SimpleField,
SearchableField,
SearchField,
SemanticConfiguration,
SemanticField,
VectorSearchProfile,
HnswAlgorithmConfiguration,
VectorSearch,
HnswParameters,
SemanticPrioritizedFields,
SemanticSearch,
)
from azure.search.documents.indexes import SearchIndexClient
import os.path
Create Index Function#
def create_index(search_index_name, vector_search_dimensions=1536):
client = SearchIndexClient(service_endpoint, credential)
# 1. Define the fields
fields = [
SimpleField(
name="chunkId",
type=SearchFieldDataType.String,
sortable=True,
filterable=True,
key=True,
),
SimpleField(
name="source",
type=SearchFieldDataType.String,
sortable=True,
filterable=True,
),
SearchableField(name="chunkContent", type=SearchFieldDataType.String),
SearchField(
name="chunkContentVector",
type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
searchable=True,
# the dimension of the embedded vector
vector_search_dimensions=vector_search_dimensions,
vector_search_profile_name="my-vector-config",
),
]
# 2. Configure the vector search configuration
vector_search = VectorSearch(
profiles=[
VectorSearchProfile(
name="my-vector-config",
algorithm_configuration_name="my-algorithms-config",
)
],
algorithms=[
# Contains configuration options specific to the hnsw approximate nearest neighbors algorithm used during indexing and querying
HnswAlgorithmConfiguration(
name="my-algorithms-config",
kind="hnsw",
# https://learn.microsoft.com/en-us/python/api/azure-search-documents/azure.search.documents.indexes.models.hnswparameters?view=azure-python-preview#variables
parameters=HnswParameters(
m=4,
# The size of the dynamic list containing the nearest neighbors, which is used during index time.
# Increasing this parameter may improve index quality, at the expense of increased indexing time.
ef_construction=400,
# The size of the dynamic list containing the nearest neighbors, which is used during search time.
# Increasing this parameter may improve search results, at the expense of slower search.
ef_search=500,
# The similarity metric to use for vector comparisons.
# Known values are: "cosine", "euclidean", and "dotProduct"
metric="cosine",
),
)
],
)
index = SearchIndex(
name=search_index_name,
fields=fields,
vector_search=vector_search,
)
result = client.create_or_update_index(index)
print(f"Index: '{result.name}' created or updated")
Create Chunking Function#
import glob
from langchain_community.document_loaders import UnstructuredFileLoader
from langchain.text_splitter import MarkdownTextSplitter
import json
Create chunks using markdown text splitter
import json
from langchain.text_splitter import MarkdownHeaderTextSplitter
def create_md_header_chunks_and_save_to_file(documents, path_to_output) -> list:
try:
if os.path.exists(path_to_output):
print(f"Chunks already created at: {path_to_output} ")
return
lengths = {}
all_chunks = []
chunk_id = 0
# tqdm.tqdm(
headers_to_split_on = [
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
]
markdown_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=headers_to_split_on, strip_headers=False)
for document in documents:
current_chunks_text_list = markdown_splitter.split_text(
document[0].page_content)
source = document[0].metadata["source"]
for i, chunk in enumerate(current_chunks_text_list):
current_chunk_dict = {
"chunkId": f"chunk{chunk_id}_{i}",
"chunkContent": chunk.page_content,
"source": source,
}
all_chunks.append(current_chunk_dict)
chunk_id += 1
with open(path_to_output, "w") as f:
json.dump(all_chunks, f)
except Exception as e:
print(f"Error creating chunks: {e}")
return all_chunks
# path_to_output = f"./output/pre-generated/chunking/md-header-text-splitter-engineering-mlops.json"
# create_md_header_chunks_and_save_to_file(documents, path_to_output)
%%capture --no-display
def load_documents_from_folder(path, totalNumberOfDocuments=200) -> list[str]:
print("Loading documents...")
markdown_documents = []
i = 0
for file in glob.glob(path, recursive=True):
loader = UnstructuredFileLoader(file)
document = loader.load()
markdown_documents.append(document)
if i == totalNumberOfDocuments:
return markdown_documents
i += 1
return markdown_documents
def create_chunks_and_save_to_file(path_to_output, totalNumberOfDocuments=200, chunk_size=300, chunk_overlap=30) -> list:
if (os.path.exists(path_to_output)):
print(f"Chunks already created at: {path_to_output} ")
return
documents = load_documents_from_folder(
"..\data\docs\code-with-engineering\**\*.md", totalNumberOfDocuments)
print("Creating chunks...")
markdown_splitter = MarkdownTextSplitter.from_tiktoken_encoder(
chunk_size=chunk_size, chunk_overlap=chunk_overlap
)
lengths = {}
all_chunks = []
chunk_id = 0
for document in documents:
current_chunks_text_list = markdown_splitter.split_text(
document[0].page_content
) # output = ["content chunk1", "content chunk2", ...]
for i, chunk in enumerate(
current_chunks_text_list
): # (0, "content chunk1"), (1, "content chunk2"), ...
current_chunk_dict = {
"chunkId": f"chunk{chunk_id}_{i}",
"chunkContent": chunk,
"source": document[0].metadata["source"],
}
all_chunks.append(current_chunk_dict)
chunk_id += 1
n_chunks = len(current_chunks_text_list)
# lengths = {[Number of chunks]: [number of documents with that number of chunks]}
if n_chunks not in lengths:
lengths[n_chunks] = 1
else:
lengths[n_chunks] += 1
with open(path_to_output, "w") as f:
json.dump(all_chunks, f)
print(f"Chunks created: ", lengths)
return all_chunks
Create Embeddings Functions#
Create embeddings using AOI:
import time
import requests
def oai_query_embedding(
query,
endpoint=azure_aoai_endpoint,
api_key=azure_openai_key,
api_version="2023-07-01-preview",
embedding_model_deployment=azure_openai_embedding_deployment,
):
"""
Query the OpenAI Embedding model to get the embeddings for the given query.
Args:
query (str): The query for which to get the embeddings.
endpoint (str): The endpoint for the OpenAI service.
api_key (str): The API key for the OpenAI service.
api_version (str): The API version for the OpenAI service.
embedding_model_deployment (str): The deployment for the OpenAI embedding model.
Returns:
list: The embeddings for the given query.
"""
# If input has more than 8,000 words, shrink it to max
if len(query) > 8000:
print("Shrinked!!1")
query = query[:8000]
# print(len(query))
request_url = f"{endpoint}/openai/deployments/{embedding_model_deployment}/embeddings?api-version={api_version}"
headers = {"Content-Type": "application/json", "api-key": api_key}
request_payload = {"input": query}
embedding_response = requests.post(
request_url, json=request_payload, headers=headers, timeout=None
)
# embedding_response = embed_input(query)
if embedding_response.status_code == 200:
# time.sleep(2.5)
# embedding_response = embed_input(query)
data_values = embedding_response.json()["data"]
embeddings_vectors = [data_value["embedding"]
for data_value in data_values]
return embeddings_vectors[0]
else:
print("Failed to get embedding: ", embedding_response.json())
print("Length: ", len(query))
return []
# print("Retried")
# raise Exception(
# f"failed to get embedding: {embedding_response.json()}")
Create embeddings using OpenAI
Create embeddings using AOI in batch:
Took 48s
import json
def generate_embeddings_for_chunks_in_batch(path_to_chunked_documents, path_to_output_file):
"""
Generate embeddings for chunked data
Args:
path_to_chunked_documents: str: path to the input file
path_to_output_file: str: path to the output file
"""
if os.path.exists(path_to_output_file):
print(f"Embeddings were already created for chunked data {path_to_chunked_documents} at: {path_to_chunked_documents}")
return
try:
with open(path_to_chunked_documents, "r", encoding="utf-8") as file:
input_data = json.load(file)
batch_size = 32
num_chunks = len(input_data)
for i in range(0, num_chunks, batch_size):
batch_chunks = input_data[i:i + batch_size]
batch_chunks_content = [chunk["chunkContent"]
for chunk in batch_chunks]
batch_embeddings = oai_query_embedding(
batch_chunks_content, batch=True)
for j, chunk in enumerate(batch_chunks):
# print("j : ", j)
chunk["chunkContentVector"] = batch_embeddings[j]
# content = chunk["chunkContent"]
# content_embeddings = oai_query_embedding(content)
# chunk["chunkContentVector"] = content_embeddings
with open(path_to_output_file, "w") as f:
json.dump(input_data, f)
except Exception as e:
print(f"Failed to generate embeddings for chunks: {e}")
Create embeddings using AOI and save to file:
Took 16 mins
def generate_embeddings_for_chunks_and_save_to_file(path_to_chunks_file, path_to_output):
try:
if (os.path.exists(path_to_output)):
print(
f"Embeddings were already created for chunked data at: {path_to_chunks_file} ")
return
# i = 0
with open(path_to_chunks_file, "r", encoding="utf-8") as file:
input_data = json.load(file)
for chunk in input_data:
content = chunk["chunkContent"]
# print(f"Length: {len(content)}")
# print(i)
# print(chunk["chunkId"])
content_emebddings = oai_query_embedding(content)
chunk["chunkContentVector"] = content_emebddings
# i = i+1
print(f"Created {len(input_data)} chunks")
print(f"Example of one chunk: {input_data[1]}")
with open(path_to_output, "w") as f:
json.dump(input_data, f)
print(f"Saved embeddings to: {path_to_output}")
except Exception as e:
print(f"Failed to generate embeddings: {e}")
from sentence_transformers import SentenceTransformer
import os
def intfloat_e5_small_v2_query_embedding(chunk, model=SentenceTransformer("intfloat/e5-small-v2")):
embedded_input = model.encode(
chunk, normalize_embeddings=True
) # Note that the type is a ndarray.
return (
embedded_input.tolist()
) # We need to reshape the array to be a list of floats
/Users/raouf/handsontest/ai-hands-on-lab/.venv/lib/python3.9/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
from .autonotebook import tqdm as notebook_tqdm
Upload data to the Index Function#
def upload_data(file_path, search_index_name):
try:
with open(file_path, "r") as file:
documents = json.load(file)
search_client = SearchClient(
endpoint=service_endpoint,
index_name=search_index_name,
credential=credential,
)
for i in range(0, len(documents)):
try:
search_client.upload_documents(documents[i])
except Exception as e:
print(f"Error uploading document {i}: {e}")
print(
f"Uploaded {len(documents)} documents to Index: {search_index_name}")
except Exception as e:
print(f"Error uploading documents: {e}")
Generate embeddings using open source model for fixed size chunking#
Took 22 minutes.
import json
from sentence_transformers import SentenceTransformer
import os
def embed_chunk(chunk, model=SentenceTransformer("intfloat/e5-small-v2")):
embedded_input = model.encode(
chunk, normalize_embeddings=True
) # Note that the type is a ndarray.
return (
embedded_input.tolist()
) # We need to reshape the array to be a list of floats
def generate_embeddings_with_intfloat_e5_small_v2(
path_to_input_file, path_to_output_file
):
if os.path.exists(path_to_output_file):
print(
f"Embeddings were already created for chunked data {path_to_input_file} at: {path_to_input_file} ")
return
try:
model = SentenceTransformer("intfloat/e5-small-v2")
with open(path_to_input_file, "r", encoding="utf-8") as file:
input_data = json.load(file)
for chunk in input_data:
content = chunk["chunkContent"]
content_emebddings = embed_chunk(content, model)
chunk["chunkContentVector"] = content_emebddings
with open(path_to_output_file, "w") as f:
json.dump(input_data, f)
except Exception as e:
print(f"Failed to generate embeddings for chunks: {e}")
# e5_small_v2_prefix = "fixed-size-chunks-180-30-engineering-mlops-e5-small-v2"
# path_to_output_file = f"../output/pre-generated/embeddings/{
# e5_small_v2_prefix}.json"
# pregenerated_fixed_size_chunks = '../output/pre-generated/chunking/fixed-size-chunks-engineering-mlops-180-30.json'
# generate_embeddings_with_intfloat_e5_small_v2(
# path_to_input_file=pregenerated_fixed_size_chunks,
# path_to_output_file=path_to_output_file,
# )
Generate embeddings using open source model for semantic chunking#
Took 12 minutes.
# path_to_chunked_documents = "../output/pre-generated/chunking/semantic-chunks-engineering-mlops.json"
# e5_small_v2_prefix = "semantic-chunking-engineering-mlops-e5-small-v2"
# path_to_output_file = f"../output/pre-generated/embeddings/{
# e5_small_v2_prefix}.json"
# generate_embeddings_with_intfloat_e5_small_v2(
# path_to_input_file=path_to_chunked_documents,
# path_to_output_file=path_to_output_file,
# )
Search documents Function#
def search_documents(search_index_name, input, embedding_function):
search_client = SearchClient(
service_endpoint, search_index_name, credential=credential
)
query_embeddings = embedding_function(input)
vector_query = VectorizedQuery(
vector=query_embeddings, k_nearest_neighbors=3, fields="chunkContentVector"
)
results = search_client.search(
search_text=None,
vector_queries=[vector_query],
select=["chunkContent", "chunkId", "source", "chunkContentVector"],
)
# print_results(results)
documents = []
for document in results:
item = {}
item["chunkContent"] = document["chunkContent"]
item["source"] = os.path.normpath(document["source"])
item["chunkId"] = document["chunkId"]
item["score"] = document['@search.score']
item["chunkContentVector"] = document["chunkContentVector"]
documents.append(item)
return documents
Create a prompt#
def create_prompt(query, documents):
system_prompt = f"""
Instructions:
"You are an AI assistant that helps users answer questions given a specific context.
You will be given a context ("chunkContent") in Retrieved Documents and will be asked a question (User Question) based on that context.
Your answer should be as precise as possible and should only come from the context.
Please add citation after each sentence when possible in a form "(Source: source+chunkId),
where both 'source' and 'chunkId' are taken from the Retrieved Documents."
"""
user_prompt = f"""
## Retrieve Documents:
{documents}
## User Question
{query}
"""
final_message = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt + "\nEND OF CONTEXT"},
]
return final_message
Call LLM#
from openai import AzureOpenAI
def call_llm(messages: list[dict]):
client = AzureOpenAI(
api_key=azure_openai_key,
api_version="2023-07-01-preview",
azure_endpoint=azure_aoai_endpoint
)
response = client.chat.completions.create(
model=azure_openai_chat_deployment, messages=messages)
return response.choices[0].message.content