Advanced Chunking Options

Advanced chunking strategies provided by Knowledge Bases for Amazon Bedrock

In this notebook, we will create 3 knowledge bases to provide sample code for the following chunking options supported by Knowledge Bases for Amazon Bedrock: 1. Fixed chunking 2. Semantic chunking 3. Hierarchical chunking 4. Custom chunking using Lambda function

Chunking breaks down the text into smaller segments before embedding. The chunking strategy can't be modified after you create the data source. As of now, Knowledge bases for Amazon Bedrock only support a few built-in chunking options: no chunking, fixed sized chunking, and default chunking.

  • With Semantic, and Hierarchical chunking features (in addition to the existing options) customers can have more control over how their data is processed and chunked using Lambda function.

We will use a synthetic 10K report as data for a fiticious company called Octank Financial to demo the solution. After creating knowledge bases we will evaluate the results on the same dataset. The focus will be on improving the quality of search results which in turn will improve the accuracy of responses generated by the foundation model.

1. Import the needed libraries

First step is to install the pre-requisites packages.

%pip install --force-reinstall -q -r utils/requirements.txt
<h2>restart kernel</h2>
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
import botocore
botocore.__version__
import os
import time
import boto3
import logging
import pprint
import json

from utils.knowledge_base import BedrockKnowledgeBase
#Clients
s3_client = boto3.client('s3')
sts_client = boto3.client('sts')
session = boto3.session.Session()
region =  session.region_name
account_id = sts_client.get_caller_identity()["Account"]
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime') 
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
region, account_id
import time

<h2>Get the current timestamp</h2>
current_time = time.time()

<h2>Format the timestamp as a string</h2>
timestamp_str = time.strftime("%Y%m%d%H%M%S", time.localtime(current_time))[-7:]
<h2>Create the suffix using the timestamp</h2>
suffix = f"{timestamp_str}"
knowledge_base_name_standard = 'standard-kb'
knowledge_base_name_hierarchical = 'hierarchical-kb'
knowledge_base_name_semantic = 'semantic-kb'
knowledge_base_name_custom = 'custom-chunking-kb'
knowledge_base_description = "Knowledge Base containing complex PDF."
bucket_name = f'{knowledge_base_name_standard}-{suffix}'
intermediate_bucket_name = f'{knowledge_base_name_standard}-intermediate-{suffix}'
lambda_function_name = f'{knowledge_base_name_custom}-lambda-{suffix}'
foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"

2 - Create knowledge bases with fixed chunking strategy

Let's start by creating a Knowledge Base for Amazon Bedrock to store the restaurant menus. Knowledge Bases allow you to integrate with different vector databases including Amazon OpenSearch Serverless, Amazon Aurora, Pinecone, Redis Enterprise and MongoDB Atlas. For this example, we will integrate the knowledge base with Amazon OpenSearch Serverless. To do so, we will use the helper class BedrockKnowledgeBase which will create the knowledge base and all of its pre-requisites: 1. IAM roles and policies 2. S3 bucket 3. Amazon OpenSearch Serverless encryption, network and data access policies 4. Amazon OpenSearch Serverless collection 5. Amazon OpenSearch Serverless vector index 6. Knowledge base 7. Knowledge base data source

First we will create a knowledge base using fixed chunking strategy followed by hierarchical chunking strategy.

Parameter values:

"chunkingStrategy": "FIXED_SIZE | NONE | HIERARCHICAL | SEMANTIC"

knowledge_base_name_standard
knowledge_base_standard = BedrockKnowledgeBase(
    kb_name=f'{knowledge_base_name_standard}-{suffix}',
    kb_description=knowledge_base_description,
    data_bucket_name=bucket_name,
    chunking_strategy = "FIXED_SIZE", 
    suffix = f'{suffix}-f'
)

2.1 Upload the dataset to Amazon S3

Now that we have created the knowledge base, let's populate it with the Octank financial 10K report dataset. The Knowledge Base data source expects the data to be available on the S3 bucket connected to it and changes on the data can be syncronized to the knowledge base using the StartIngestionJob API call. In this example we will use the boto3 abstraction of the API, via our helper classe.

Let's first upload the menu's data available on the dataset folder to s3.

import os

def upload_directory(path, bucket_name):
    for root, dirs, files in os.walk(path):
        for file in files:
            file_to_upload = os.path.join(root, file)
            if file not in ["LICENSE", "NOTICE", "README.md"]:
                print(f"uploading file {file_to_upload} to {bucket_name}")
                s3_client.upload_file(file_to_upload, bucket_name, file)
            else:
                print(f"Skipping file {file_to_upload}")

upload_directory("../synthetic_dataset", bucket_name)

Now we start the ingestion job.

<h2>ensure that the kb is available</h2>
time.sleep(30)
<h2>sync knowledge base</h2>
knowledge_base_standard.start_ingestion_job()

Finally we save the Knowledge Base Id to test the solution at a later stage.

kb_id_standard = knowledge_base_standard.get_knowledge_base_id()

2.2 Test the Knowledge Base

Now the Knowlegde Base is available we can test it out using the retrieve and retrieve_and_generate functions.

Testing Knowledge Base with Retrieve and Generate API

Let's first test the knowledge base using the retrieve and generate API. With this API, Bedrock takes care of retrieving the necessary references from the knowledge base and generating the final answer using a foundation model from Bedrock.

query = Provide a summary of consolidated statements of cash flows of Octank Financial for the fiscal years ended December 31, 2019.

The right response for this query as per ground truth QA pair is:

The cash flow statement for Octank Financial in the year ended December 31, 2019 reveals the following:
- Cash generated from operating activities amounted to $710 million, which can be attributed to a $700 million profit and non-cash charges such as depreciation and amortization.
- Cash outflow from investing activities totaled $240 million, with major expenditures being the acquisition of property, plant, and equipment ($200 million) and marketable securities ($60 million), partially offset by the sale of property, plant, and equipment ($40 million) and maturing marketable securities ($20 million).
- Financing activities resulted in a cash inflow of $350 million, stemming from the issuance of common stock ($200 million) and long-term debt ($300 million), while common stock repurchases ($50 million) and long-term debt payments ($100 million) reduced the cash inflow. 
Overall, Octank Financial experienced a net cash enhancement of $120 million in 2019, bringing their total cash and cash equivalents to $210 million.
query = "Provide a summary of consolidated statements of cash flows of Octank Financial for the fiscal years ended December 31, 2019."
response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        "text": query
    },
    retrieveAndGenerateConfiguration={
        "type": "KNOWLEDGE_BASE",
        "knowledgeBaseConfiguration": {
            'knowledgeBaseId': kb_id_standard,
            "modelArn": "arn:aws:bedrock:{}::foundation-model/{}".format(region, foundation_model),
            "retrievalConfiguration": {
                "vectorSearchConfiguration": {
                    "numberOfResults":5
                } 
            }
        }
    }
)

print(response['output']['text'],end='\n'*2)

As you can see, with the retrieve and generate API we get the final response directly, now let's observe the citations for RetreiveAndGenerate API. Since, our primary focus on this notebook is to observe the retrieved chunks and citations returned by the model while generating the response. When we provide the relevant context to the foundation model alongwith the query, it will most likely generate the high quality response.

def citations_rag_print(response_ret):
#structure 'retrievalResults': list of contents. Each list has content, location, score, metadata
    for num,chunk in enumerate(response_ret,1):
        print(f'Chunk {num}: ',chunk['content']['text'],end='\n'*2)
        print(f'Chunk {num} Location: ',chunk['location'],end='\n'*2)
        print(f'Chunk {num} Metadata: ',chunk['metadata'],end='\n'*2)
response_standard = response['citations'][0]['retrievedReferences']
print("# of citations or chunks used to generate the response: ", len(response_standard))
citations_rag_print(response_standard)

Let's now inspect the source information from the knowledge base with the retrieve API.

Testing Knowledge Base with Retrieve API

If you need an extra layer of control, you can retrieve the chunks that best match your query using the retrieve API. In this setup, we can configure the desired number of results and control the final answer with your own application logic. The API then provides you with the matching content, its S3 location, the similarity score and the chunk metadata.

def response_print(response_ret):
#structure 'retrievalResults': list of contents. Each list has content, location, score, metadata
    for num,chunk in enumerate(response_ret['retrievalResults'],1):
        print(f'Chunk {num}: ',chunk['content']['text'],end='\n'*2)
        print(f'Chunk {num} Location: ',chunk['location'],end='\n'*2)
        print(f'Chunk {num} Score: ',chunk['score'],end='\n'*2)
        print(f'Chunk {num} Metadata: ',chunk['metadata'],end='\n'*2)
response_standard_ret = bedrock_agent_runtime_client.retrieve(
    knowledgeBaseId=kb_id_standard, 
    nextToken='string',
    retrievalConfiguration={
        "vectorSearchConfiguration": {
            "numberOfResults":5,
        } 
    },
    retrievalQuery={
        'text': query
    }
)

print("# of retrieved results: ", len(response_standard_ret['retrievalResults']))
response_print(response_standard_ret)

As you can notice, that with fixed chunking we get 5 retrieved results as requested in the API using semantic similarity which is the default for Retrieve API. Let's now use hierarchical chunking strategy and inspect the retrieved results using RetrieveAndGenerate API as well as Retrieve API.

3. Create knowledge bases with hierarchical chunking strategy

Concept

Hierarchical chunking: Organizes your data into a hierarchical structure, allowing for more granular and efficient retrieval based on the inherent relationships within your data. Organizing your data into a hierarchical structure enables your RAG workflow to efficiently navigate and retrieve information from complex, nested datasets. After the documents are parsed, the first step is to chunk the documents based on the parent and child chunking size. The chunks are then organized into a hierarchical structure, where parent chunk (higher level) represents larger chunks (for example, documents or sections), and child chunks (lower level) represent smaller chunks (for example, paragraphs or sentences). The relationship between the parent and child chunks are maintained. This hierarchical structure allows for efficient retrieval and navigation of the corpus.

Benefits:

  • Efficient retrieval: The hierarchical structure allows faster and more targeted retrieval of relevant information; first by performing semantic search on the child chunk and then returning the parent chunk during retrieval. By replacing the children chunks with the parent chunk, we provide large and comprehensive context to the FM.
  • Context preservation: Organizing the corpus in a hierarchical manner helps preserve the contextual relationships between chunks, which can be beneficial for generating coherent and contextually relevant text.



Note: In hierarchical chunking, parent chunks are returned and search is performed on children chunks, therefore, you might see less number of search results returned as one parent can have multiple children.

Hierarchical chunking is best suited for complex documents that have a nested or hierarchical structure, such as technical manuals, legal documents, or academic papers with complex formatting and nested tables.

Parameter values:

"chunkingStrategy": "FIXED_SIZE | NONE | HIERARCHICAL | SEMANTIC"

knowledge_base_hierarchical = BedrockKnowledgeBase(
    kb_name=f'{knowledge_base_name_hierarchical}-{suffix}',
    kb_description=knowledge_base_description,
    data_bucket_name=bucket_name, 
    chunking_strategy = "HIERARCHICAL", 
    suffix = f'{suffix}-h'
)

Now start the ingestion job. Since, we are using the same documents as used for fixed chunking, we are skipping the step to upload documents to s3 bucket.

<h2>ensure that the kb is available</h2>
time.sleep(30)
<h2>sync knowledge base</h2>
knowledge_base_hierarchical.start_ingestion_job()

Save the knowledge base id for further testing.

kb_id_hierarchical = knowledge_base_hierarchical.get_knowledge_base_id()

3.1 Test the Knowledge Base

Now the Knowlegde Base is available we can test it out using the retrieve and retrieve_and_generate functions.

Testing Knowledge Base with Retrieve and Generate API

Let's first test the knowledge base using the retrieve and generate API. With this API, Bedrock takes care of retrieving the necessary references from the knowledge base and generating the final answer using a foundation model from Bedrock.

query = Provide a summary of consolidated statements of cash flows of Octank Financial for the fiscal years ended December 31, 2019.

The right response for this query as per ground truth QA pair is:

The cash flow statement for Octank Financial in the year ended December 31, 2019 reveals the following:
- Cash generated from operating activities amounted to $710 million, which can be attributed to a $700 million profit and non-cash charges such as depreciation and amortization.
- Cash outflow from investing activities totaled $240 million, with major expenditures being the acquisition of property, plant, and equipment ($200 million) and marketable securities ($60 million), partially offset by the sale of property, plant, and equipment ($40 million) and maturing marketable securities ($20 million).
- Financing activities resulted in a cash inflow of $350 million, stemming from the issuance of common stock ($200 million) and long-term debt ($300 million), while common stock repurchases ($50 million) and long-term debt payments ($100 million) reduced the cash inflow. 
Overall, Octank Financial experienced a net cash enhancement of $120 million in 2019, bringing their total cash and cash equivalents to $210 million.
response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        "text": query
    },
    retrieveAndGenerateConfiguration={
        "type": "KNOWLEDGE_BASE",
        "knowledgeBaseConfiguration": {
            'knowledgeBaseId': kb_id_hierarchical,
            "modelArn": "arn:aws:bedrock:{}::foundation-model/{}".format(region, foundation_model),
            "retrievalConfiguration": {
                "vectorSearchConfiguration": {
                    "numberOfResults":5
                } 
            }
        }
    }
)

print(response['output']['text'],end='\n'*2)

As you can see, with the RetreiveAndGenerate API we get the final response directly, now let's observe the citations for RetreiveAndGenerate API. Since, our primary focus on this notebook is to observe the retrieved chunks and citations returned by the model while generating the response. When we provide the relevant context to the foundation model alongwith the query, it will most likely generate the high quality response.

response_hierarchical = response['citations'][0]['retrievedReferences']
print("# of citations or chunks used to generate the response: ", len(response_hierarchical))
citations_rag_print(response_hierarchical)

Let's now retrieve the source information from the knowledge base with the retrieve API.

Testing Knowledge Base with Retrieve API

If you need an extra layer of control, you can retrieve the chuncks that best match your query using the retrieve API. In this setup, we can configure the desired number of results and control the final answer with your own application logic. The API then provides you with the matching content, its S3 location, the similarity score and the chunk metadata.

response_hierarchical_ret = bedrock_agent_runtime_client.retrieve(
    knowledgeBaseId=kb_id_hierarchical, 
    nextToken='string',
    retrievalConfiguration={
        "vectorSearchConfiguration": {
            "numberOfResults":5,
        } 
    },
    retrievalQuery={
        'text': query
    }
)

print("# of retrieved results: ", len(response_hierarchical_ret['retrievalResults']))
response_print(response_hierarchical_ret)


Note: As you can see in the above response, that the retrieve API returned only 3 search results or chunks although 5 were passed in the request. The reason is that with hiearchical chunking, parent chunks are returned by the API whereas search is performed on children chunks and one parent chunk can have multiple children chunks. Therefore, response returned only 3 chunks while the search was performed on 5 children chunks.

4. Create knowledge bases with semantic chunking strategy

Concept

Semantic chunking analyzes the relationships within a text and divides it into meaningful and complete chunks, which are derived based on the semantic similarity calculated by the embedding model. This approach preserves the information’s integrity during retrieval, helping to ensure accurate and contextually appropriate results. Knowledge Bases for Amazon Bedrock first divides documents into chunks based on the specified token size. Embeddings are created for each chunk, and similar chunks in the embedding space are combined based on the similarity threshold and buffer size, forming new chunks. Consequently, the chunk size can vary across chunks.

Benefits

  • By focusing on the text’s meaning and context, semantic chunking significantly improves the quality of retrieval. It should be used in scenarios where maintaining the semantic integrity of the text is crucial.

  • Although this method is more computationally intensive than fixed-size chunking, it can be beneficial for chunking documents where contextual boundaries aren’t clear—for example, legal documents or technical manuals.[1]

Parameter values:

"chunkingStrategy": "FIXED_SIZE | NONE | HIERARCHICAL | SEMANTIC"
knowledge_base_semantic = BedrockKnowledgeBase(
    kb_name=f'{knowledge_base_name_semantic}-{suffix}',
    kb_description=knowledge_base_description,
    data_bucket_name=bucket_name, 
    chunking_strategy = "SEMANTIC", 
    suffix = f'{suffix}-s'
)

Now start the ingestion job. Since, we are using the same documents as used for fixed chunking, we are skipping the step to upload documents to s3 bucket.

<h2>ensure that the kb is available</h2>
time.sleep(30)
<h2>sync knowledge base</h2>
knowledge_base_semantic.start_ingestion_job()
kb_id_semantic = knowledge_base_semantic.get_knowledge_base_id()

4.1 Test the Knowledge Base

Now the Knowlegde Base is available we can test it out using the retrieve and retrieve_and_generate functions.

Testing Knowledge Base with Retrieve and Generate API

Let's first test the knowledge base using the retrieve and generate API. With this API, Bedrock takes care of retrieving the necessary references from the knowledge base and generating the final answer using a foundation model from Bedrock.

query = Provide a summary of consolidated statements of cash flows of Octank Financial for the fiscal years ended December 31, 2019.

The right response for this query as per ground truth QA pair is:

The cash flow statement for Octank Financial in the year ended December 31, 2019 reveals the following:
- Cash generated from operating activities amounted to $710 million, which can be attributed to a $700 million profit and non-cash charges such as depreciation and amortization.
- Cash outflow from investing activities totaled $240 million, with major expenditures being the acquisition of property, plant, and equipment ($200 million) and marketable securities ($60 million), partially offset by the sale of property, plant, and equipment ($40 million) and maturing marketable securities ($20 million).
- Financing activities resulted in a cash inflow of $350 million, stemming from the issuance of common stock ($200 million) and long-term debt ($300 million), while common stock repurchases ($50 million) and long-term debt payments ($100 million) reduced the cash inflow. 
Overall, Octank Financial experienced a net cash enhancement of $120 million in 2019, bringing their total cash and cash equivalents to $210 million.
time.sleep(20)

response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        "text": query
    },
    retrieveAndGenerateConfiguration={
        "type": "KNOWLEDGE_BASE",
        "knowledgeBaseConfiguration": {
            'knowledgeBaseId': kb_id_semantic,
            "modelArn": "arn:aws:bedrock:{}::foundation-model/{}".format(region, foundation_model),
            "retrievalConfiguration": {
                "vectorSearchConfiguration": {
                    "numberOfResults":5
                } 
            }
        }
    }
)

print(response['output']['text'],end='\n'*2)

As you can see, with the RetreiveAndGenerate API we get the final response directly, now let's observe the citations for RetreiveAndGenerate API. Since, our primary focus on this notebook is to observe the retrieved chunks and citations returned by the model while generating the response. When we provide the relevant context to the foundation model alongwith the query, it will most likely generate the high quality response.

response_semantic = response['citations'][0]['retrievedReferences']
print("# of citations or chunks used to generate the response: ", len(response_semantic))
citations_rag_print(response_semantic)

Let's now retrieve the source information from the knowledge base with the retrieve API.

Testing Knowledge Base with Retrieve API

If you need an extra layer of control, you can retrieve the chuncks that best match your query using the retrieve API. In this setup, we can configure the desired number of results and control the final answer with your own application logic. The API then provides you with the matching content, its S3 location, the similarity score and the chunk metadata.

response_semantic_ret = bedrock_agent_runtime_client.retrieve(
    knowledgeBaseId=kb_id_semantic, 
    nextToken='string',
    retrievalConfiguration={
        "vectorSearchConfiguration": {
            "numberOfResults":5,
        } 
    },
    retrievalQuery={
        'text': query
    }
)
print("# of citations or chunks used to generate the response: ", len(response_semantic_ret['retrievalResults']))
response_print(response_semantic_ret)

5. Custom chunking option using Lambda Functions

When creating an Knowledge Bases (KB) for Amazon Bedrock, you can connect a Lambda function to specify your custom chunking logic. During ingestion, if lambda function is provided, Knowledge Bases, will run the lambda function, and store the input and output values in the intermediate s3 bucket provided.


Note: Lambda function with KB can be used for adding custom chunking logic as well processing your chunks for example, adding chunk level metadata. In this example we are focusing on using Lambda function for custom chunking logic.

5.1 Create the Lambda Function

We will now create a lambda function which will have code for custom chunking. To do so we will:

  1. Create the lambda_function.py file which contains the logic for custom chunking.
  2. Create the IAM role for our Lambda function.
  3. Create the lambda function with the required permissions.

Create the function code

Let's create the lambda function tha implements the functions for reading your file from intermediate bucket, process the contents with custom chunking logic and write the output back to s3 bucket.

%%writefile lambda_function.py
import json
from abc import abstractmethod, ABC
from typing import List
from urllib.parse import urlparse
import boto3
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

class Chunker(ABC):
    @abstractmethod
    def chunk(self, text: str) -> List[str]:
        raise NotImplementedError()

class SimpleChunker(Chunker):
    def chunk(self, text: str) -> List[str]:
        words = text.split()
        return [' '.join(words[i:i+100]) for i in range(0, len(words), 100)]

def lambda_handler(event, context):
    logger.debug('input={}'.format(json.dumps(event)))
    s3 = boto3.client('s3')

    # Extract relevant information from the input event
    input_files = event.get('inputFiles')
    input_bucket =  event.get('bucketName')


    if not all([input_files, input_bucket]):
        raise ValueError("Missing required input parameters")

    output_files = []
    chunker = SimpleChunker()

    for input_file in input_files:
        content_batches = input_file.get('contentBatches', [])
        file_metadata = input_file.get('fileMetadata', {})
        original_file_location = input_file.get('originalFileLocation', {})

        processed_batches = []

        for batch in content_batches:
            input_key = batch.get('key')

            if not input_key:
                raise ValueError("Missing uri in content batch")

            # Read file from S3
            file_content = read_s3_file(s3, input_bucket, input_key)

            # Process content (chunking)
            chunked_content = process_content(file_content, chunker)

            output_key = f"Output/{input_key}"

            # Write processed content back to S3
            write_to_s3(s3, input_bucket, output_key, chunked_content)

            # Add processed batch information
            processed_batches.append({
                'key': output_key
            })

        # Prepare output file information
        output_file = {
            'originalFileLocation': original_file_location,
            'fileMetadata': file_metadata,
            'contentBatches': processed_batches
        }
        output_files.append(output_file)

    result = {'outputFiles': output_files}

    return result


def read_s3_file(s3_client, bucket, key):
    response = s3_client.get_object(Bucket=bucket, Key=key)
    return json.loads(response['Body'].read().decode('utf-8'))

def write_to_s3(s3_client, bucket, key, content):
    s3_client.put_object(Bucket=bucket, Key=key, Body=json.dumps(content))    

def process_content(file_content: dict, chunker: Chunker) -> dict:
    chunked_content = {
        'fileContents': []
    }

    for content in file_content.get('fileContents', []):
        content_body = content.get('contentBody', '')
        content_type = content.get('contentType', '')
        content_metadata = content.get('contentMetadata', {})

        words = content['contentBody']
        chunks = chunker.chunk(words)

        for chunk in chunks:
            chunked_content['fileContents'].append({
                'contentType': content_type,
                'contentMetadata': content_metadata,
                'contentBody': chunk
            })

    return chunked_content

The standard chunking strategies values provided by knowledge bases are following:

Parameter values:

"chunkingStrategy": "FIXED_SIZE | NONE | HIERARCHICAL | SEMANTIC"

For implementing our custom logic, we have included an option in the knowledge_base.py class for passing a value of CUSTOM". If you pass the chunking strategy as CUSTOM in this class, it will do the following:

  1. It select the chunkingStrategy as NONE.
  2. It will add customTransformationConfiguration to the vectorIngestionConfiguration as follows:
{
...
   "vectorIngestionConfiguration": {
    "customTransformationConfiguration": { 
         "intermediateStorage": { 
            "s3Location": { 
               "uri": "string"
            }
         },
         "transformations": [
            {
               "transformationFunction": {
                  "lambdaConfiguration": {
                     "lambdaArn": "string"
                  }
               },
               "stepToApply": "string" // enum of POST_CHUNKING
            }
         ]
      },
      "chunkingConfiguration": {
         "chunkingStrategy": "NONE"
         ...
   }
}
knowledge_base_custom = BedrockKnowledgeBase(
    kb_name=f'{knowledge_base_name_custom}-{suffix}',
    kb_description=knowledge_base_description,
    data_bucket_name=bucket_name,
    lambda_function_name=lambda_function_name,
    intermediate_bucket_name=intermediate_bucket_name, 
    chunking_strategy = "CUSTOM", 
    suffix = f'{suffix}-c'
)

Now start the ingestion job.

<h2>ensure that the kb is available</h2>
time.sleep(30)
<h2>sync knowledge base</h2>
knowledge_base_custom.start_ingestion_job()
kb_id_custom = knowledge_base_custom.get_knowledge_base_id()

5.2 Test the Knowledge Base

Now the Knowlegde Base is available we can test it out using the retrieve and retrieve_and_generate functions.

Testing Knowledge Base with Retrieve and Generate API

Let's first test the knowledge base using the retrieve and generate API. With this API, Bedrock takes care of retrieving the necessary references from the knowledge base and generating the final answer using a foundation model from Bedrock.

query = Provide a summary of consolidated statements of cash flows of Octank Financial for the fiscal years ended December 31, 2019.

The right response for this query as per ground truth QA pair is:

The cash flow statement for Octank Financial in the year ended December 31, 2019 reveals the following:
- Cash generated from operating activities amounted to $710 million, which can be attributed to a $700 million profit and non-cash charges such as depreciation and amortization.
- Cash outflow from investing activities totaled $240 million, with major expenditures being the acquisition of property, plant, and equipment ($200 million) and marketable securities ($60 million), partially offset by the sale of property, plant, and equipment ($40 million) and maturing marketable securities ($20 million).
- Financing activities resulted in a cash inflow of $350 million, stemming from the issuance of common stock ($200 million) and long-term debt ($300 million), while common stock repurchases ($50 million) and long-term debt payments ($100 million) reduced the cash inflow. 
Overall, Octank Financial experienced a net cash enhancement of $120 million in 2019, bringing their total cash and cash equivalents to $210 million.
time.sleep(10)

response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        "text": query
    },
    retrieveAndGenerateConfiguration={
        "type": "KNOWLEDGE_BASE",
        "knowledgeBaseConfiguration": {
            'knowledgeBaseId': kb_id_custom,
            "modelArn": "arn:aws:bedrock:{}::foundation-model/{}".format(region, foundation_model),
            "retrievalConfiguration": {
                "vectorSearchConfiguration": {
                    "numberOfResults":5
                } 
            }
        }
    }
)

print(response['output']['text'],end='\n'*2)

As you can see, with the RetreiveAndGenerate API we get the final response directly, now let's observe the citations for RetreiveAndGenerate API. Since, our primary focus on this notebook is to observe the retrieved chunks and citations returned by the model while generating the response. When we provide the relevant context to the foundation model alongwith the query, it will most likely generate the high quality response.

response_custom = response['citations'][0]['retrievedReferences']
print("# of citations or chunks used to generate the response: ", len(response_custom))
citations_rag_print(response_custom)

Let's now retrieve the source information from the knowledge base with the retrieve API.

Testing Knowledge Base with Retrieve API

If you need an extra layer of control, you can retrieve the chuncks that best match your query using the retrieve API. In this setup, we can configure the desired number of results and control the final answer with your own application logic. The API then provides you with the matching content, its S3 location, the similarity score and the chunk metadata.

response_custom_ret = bedrock_agent_runtime_client.retrieve(
    knowledgeBaseId=kb_id_custom, 
    nextToken='string',
    retrievalConfiguration={
        "vectorSearchConfiguration": {
            "numberOfResults":5,
        } 
    },
    retrievalQuery={
        'text': query
    }
)
print("# of citations or chunks used to generate the response: ", len(response_custom_ret['retrievalResults']))
response_print(response_custom_ret)

In all cases, while evaluating one query, we got the correct response. However, when you are building a RAG application, you need to evaluate with large number of Questions and Answers to figure out the accuracy improvements. In the next step, we will use RAG Assessment (RAGAS) open source framework to evaluate the responses on your dataset for the metrics related to evaluating the quality of the context or search results. We will focus only on 2 metrics:

  1. Context recall
  2. Context relevancy

6. Evaluating search results using RAG Assessment (RAGAS) framework on your dataset

You can use RAGAS framework to evaluate your results for each chunking strategy. This approach can help you provide factual guidance on which chunking strategy to use for your dataset.

Ideally, you should consider optimizing on other parameters as well for example in case of heirarchical chunking you should consider trying different sizes for parent chunk or child chunk.

Below approach will provide you heuristics as to which strategy could be used based on the default parameters recommended by Knowledge Bases for Amazon Bedrock.

print("Semantic: ", kb_id_semantic)
print("Standard: ", kb_id_standard)
print("Hierarchical: ", kb_id_hierarchical)
print("Custom chunking: ", kb_id_custom)

Evaluation

In this section we will utilize RAGAS for evaluating search results using following metrics: 1. Context Recall: Context recall measures the extent to which the retrieved context aligns with the annotated answer, treated as the ground truth. It is computed based on the ground truth and the retrieved context, and the values range between 0 and 1, with higher values indicating better performance.

  1. Context relevancy: This metric gauges the relevancy of the retrieved context, calculated based on both the question and contexts. The values fall within the range of (0, 1), with higher values indicating better relevancy.
from utils.evaluation import KnowledgeBasesEvaluations

from ragas.metrics import (
    context_recall,
    context_relevancy,
    )

metrics = [context_recall,
           context_relevancy
           ]

MODEL_ID_EVAL = "anthropic.claude-3-sonnet-20240229-v1:0"
MODEL_ID_GEN = "anthropic.claude-3-haiku-20240307-v1:0"

questions = [
        "Provide a summary of consolidated statements of cash flows of Octank Financial for the fiscal years ended December 31, 2019.",
]
ground_truths = [
    "The cash flow statement for Octank Financial in the year ended December 31, 2019 reveals the following:\
- Cash generated from operating activities amounted to $710 million, which can be attributed to a $700 million profit and non-cash charges such as depreciation and amortization.\
- Cash outflow from investing activities totaled $240 million, with major expenditures being the acquisition of property, plant, and equipment ($200 million) and marketable securities ($60 million), partially offset by the sale of property, plant, and equipment ($40 million) and maturing marketable securities ($20 million).\
- Financing activities resulted in a cash inflow of $350 million, stemming from the issuance of common stock ($200 million) and long-term debt ($300 million), while common stock repurchases ($50 million) and long-term debt payments ($100 million) reduced the cash inflow. \
Overall, Octank Financial experienced a net cash enhancement of $120 million in 2019, bringing their total cash and cash equivalents to $210 million.",
]
kb_evaluate_standard = KnowledgeBasesEvaluations(model_id_eval=MODEL_ID_EVAL, 
                        model_id_generation=MODEL_ID_GEN, 
                        metrics=metrics,
                        questions=questions, 
                        ground_truth=ground_truths, 
                        KB_ID=kb_id_standard,
                        )

kb_evaluate_hierarchical = KnowledgeBasesEvaluations(model_id_eval=MODEL_ID_EVAL, 
                        model_id_generation=MODEL_ID_GEN, 
                        metrics=metrics,
                        questions=questions, 
                        ground_truth=ground_truths, KB_ID=kb_id_hierarchical)

kb_evaluate_semantic = KnowledgeBasesEvaluations(model_id_eval=MODEL_ID_EVAL, 
                        model_id_generation=MODEL_ID_GEN, 
                        metrics=metrics,
                        questions=questions, 
                        ground_truth=ground_truths, KB_ID=kb_id_semantic)

kb_evaluate_custom = KnowledgeBasesEvaluations(model_id_eval=MODEL_ID_EVAL, 
                        model_id_generation=MODEL_ID_GEN, 
                        metrics=metrics,
                        questions=questions, 
                        ground_truth=ground_truths, KB_ID=kb_id_custom)
results_heirarchical = kb_evaluate_hierarchical.evaluate()
results_standard = kb_evaluate_standard.evaluate()
results_semantic = kb_evaluate_semantic.evaluate()
results_custoom = kb_evaluate_custom.evaluate()
import pandas as pd
pd.options.display.max_colwidth = 800
print("Fixed Chunking Evaluation for synthetic 10K report")
print("--------------------------------------------------------------------")
print("Average context_recall: ", results_standard["context_recall"].mean())
print("Average context_relevancy: ", results_standard["context_relevancy"].mean(), "\n")

print("Hierarchical Chunking Evaluation for synthetic 10K report")
print("--------------------------------------------------------------------")
print("Average context_recall: ", results_heirarchical["context_recall"].mean())
print("Average context_relevancy: ", results_heirarchical["context_relevancy"].mean(), "\n")

print("Semantic Chunking Evaluation for synthetic 10K report")
print("--------------------------------------------------------------------")
print("Average context_recall: ", results_semantic["context_recall"].mean())
print("Average context_relevancy: ", results_semantic["context_relevancy"].mean(), "\n")

print("Custom Chunking Evaluation for synthetic 10K report")
print("--------------------------------------------------------------------")
print("Average context_recall: ", results_custoom["context_recall"].mean())
print("Average context_relevancy: ", results_custoom["context_relevancy"].mean())
print("===============================Knowledge base with fixed chunking==============================\n")
knowledge_base_standard.delete_kb(delete_s3_bucket=True, delete_iam_roles_and_policies=True)
print("===============================Knowledge base with hierarchical chunking==============================\n")
knowledge_base_hierarchical.delete_kb(delete_s3_bucket=False,delete_iam_roles_and_policies=True)
print("===============================Knowledge base with semantic chunking==============================\n")
knowledge_base_semantic.delete_kb(delete_s3_bucket=False,delete_iam_roles_and_policies=True)
print("===============================Knowledge base with custom chunking==============================\n")
knowledge_base_custom.delete_kb(delete_s3_bucket=True,delete_iam_roles_and_policies=True, delete_lambda_function = True)