1+ import boto3
2+ import json
3+ import datetime
4+ import time
5+
6+ class DateTimeEncoder (json .JSONEncoder ):
7+ def default (self , obj ):
8+ if isinstance (obj , datetime .datetime ):
9+ return obj .isoformat ()
10+ return super ().default (obj )
11+
12+ def get_knowledge_base_id (knowledge_base_name , region_name , bedrock_agent ):
13+ response = bedrock_agent .list_knowledge_bases ()
14+ for kb in response ['knowledgeBaseSummaries' ]:
15+ if kb ['name' ] == knowledge_base_name :
16+ return kb ['knowledgeBaseId' ]
17+ raise ValueError (f"Knowledge base '{ knowledge_base_name } ' not found" )
18+
19+ def get_or_create_data_source (knowledge_base_id , language , region_name , bedrock_agent ):
20+ # List existing data sources
21+ response = bedrock_agent .list_data_sources (knowledgeBaseId = knowledge_base_id )
22+ data_sources = response ['dataSourceSummaries' ]
23+
24+ # Look for existing data source for this SDK
25+ for ds in data_sources :
26+ if language in ds ['name' ] and ds ['name' ] != "default" :
27+ return ds ['dataSourceId' ], ds ['name' ], False # Found existing
28+ if language in ["steering-docs" , "final-specs" ]:
29+ ds_name = f"{ language } -data-source"
30+ bucket_name = f"{ language } -bucket"
31+ else :
32+ ds_name = f"{ language } -premium-data-source"
33+ bucket_name = f"{ language } -premium-bucket"
34+ # Create new data source if none found
35+ response = bedrock_agent .create_data_source (
36+ knowledgeBaseId = knowledge_base_id ,
37+ name = ds_name ,
38+ dataSourceConfiguration = {
39+ "type" : "S3" ,
40+ "s3Configuration" : {
41+ "bucketArn" : f"arn:aws:s3:::{ bucket_name } "
42+ }
43+ },
44+ vectorIngestionConfiguration = {
45+ "chunkingConfiguration" : {
46+ "chunkingStrategy" : "HIERARCHICAL" ,
47+ "hierarchicalChunkingConfiguration" : {
48+ "levelConfigurations" : [
49+ {
50+ "maxTokens" : 1500
51+ },
52+ {
53+ "maxTokens" : 300
54+ }
55+ ],
56+ "overlapTokens" : 75
57+ }
58+ }
59+ }
60+ )
61+ return response ['dataSource' ]['dataSourceId' ], response ['dataSource' ]['name' ], True # Created new
62+
63+ def sync_data_source (knowledge_base_id , data_source_id , region_name , bedrock_agent ):
64+ response = bedrock_agent .start_ingestion_job (
65+ knowledgeBaseId = knowledge_base_id ,
66+ dataSourceId = data_source_id
67+ )
68+ return response
69+
70+ def monitor_ingestion_job (knowledge_base_id , data_source_id , ingestion_job_id , region_name , bedrock_agent ):
71+ max_attempts = 100
72+ attempts = 0
73+
74+ while attempts < max_attempts :
75+ job_status = bedrock_agent .get_ingestion_job (
76+ knowledgeBaseId = knowledge_base_id ,
77+ dataSourceId = data_source_id ,
78+ ingestionJobId = ingestion_job_id
79+ )
80+
81+ status = job_status ['ingestionJob' ]['status' ]
82+ print (f"Current status: { status } - { datetime .datetime .now ().strftime ('%Y-%m-%d %H:%M:%S' )} " )
83+
84+ if status in ['COMPLETE' , 'FAILED' , 'STOPPED' ]:
85+ return job_status
86+
87+ attempts += 1
88+ time .sleep (5 )
89+
90+ return {"status" : "TIMEOUT" , "message" : "Job monitoring timed out after 5 minutes" }
91+
92+ def lambda_handler (event , context ):
93+ language = event .get ('language' , 'python' )
94+ region_name = event .get ('region_name' , 'us-west-2' )
95+ if language in ["steering-docs" , "final-specs" ,"coding-standards" ]:
96+ knowledge_base_name = f"{ language } -KB"
97+ else :
98+ knowledge_base_name = f"{ language } -premium-KB"
99+
100+ bedrock_agent = boto3 .client ('bedrock-agent' , region_name = region_name )
101+
102+ knowledge_base_id = get_knowledge_base_id (knowledge_base_name , region_name , bedrock_agent )
103+
104+ # Get or create data source
105+ data_source_id , data_source_name , is_new = get_or_create_data_source (
106+ knowledge_base_id , language , region_name , bedrock_agent
107+ )
108+
109+ results = {
110+ "data_source" : {
111+ "id" : data_source_id ,
112+ "name" : data_source_name ,
113+ "is_new" : is_new
114+ },
115+ "ingestion_job" : None ,
116+ "statistics" : None
117+ }
118+
119+ # Sync the data source
120+ print (f"Syncing data source { data_source_name } ..." )
121+ sync_result = sync_data_source (knowledge_base_id , data_source_id , region_name , bedrock_agent )
122+
123+ ingestion_job_id = sync_result ['ingestionJob' ]['ingestionJobId' ]
124+ results ["ingestion_job" ] = {"id" : ingestion_job_id , "status" : "STARTED" }
125+
126+ # Monitor the ingestion job
127+ final_status = monitor_ingestion_job (
128+ knowledge_base_id , data_source_id , ingestion_job_id , region_name , bedrock_agent
129+ )
130+
131+ results ["ingestion_job" ]["status" ] = final_status .get ('ingestionJob' , {}).get ('status' , 'UNKNOWN' )
132+
133+ # Get statistics
134+ if 'statistics' in final_status .get ('ingestionJob' , {}):
135+ stats = final_status ['ingestionJob' ]['statistics' ]
136+ results ["statistics" ] = {
137+ "documents_processed" : stats .get ('numberOfDocumentsScanned' , 0 ),
138+ "documents_failed" : stats .get ('numberOfDocumentsFailed' , 0 ),
139+ "documents_indexed" : stats .get ('numberOfNewDocumentsIndexed' , 0 ),
140+ "documents_modified_indexed" : stats .get ('numberOfModifiedDocumentsIndexed' ,0 )
141+ }
142+
143+ return {
144+ 'statusCode' : 200 ,
145+ 'body' : json .dumps (results , cls = DateTimeEncoder )
146+ }
0 commit comments