2025-07-21 10:45:13 -04:00

41 lines
1.3 KiB
Python

from dotenv import load_dotenv
import utils
import os
import json
import requests
from requests_auth_aws_sigv4 import AWSSigV4
load_dotenv()
#create boto3 session and client
(boto_session, agentcore_client) = utils.create_agentcore_client()
dataStoreEndpoint = os.getenv("healthlake_endpoint")
def ingest_data(patientDataFile, immunizationDataFile):
resourcePath = 'Patient'
fhirEndpoint = dataStoreEndpoint + resourcePath + '/'
auth = AWSSigV4("healthlake", session=boto_session)
with open(patientDataFile) as json_body:
patientJson = json.load(json_body)
for entry in patientJson['entry']:
print(f"FHIR Endpoint: {fhirEndpoint}, Patient Id: {entry['resource']['id']}")
r = requests.put(fhirEndpoint + entry['resource']['id'], json=entry['resource'], auth=auth)
resourcePath = 'Immunization'
fhirEndpoint = dataStoreEndpoint + resourcePath + '/'
with open(immunizationDataFile) as json_body:
immunizationJson = json.load(json_body)
print("FHIR Endpoint: ", fhirEndpoint)
for entry in immunizationJson['entry']:
print(f"Ingesting Immunization id: {entry['resource']['id']}")
r = requests.post(fhirEndpoint, json=entry['resource'], auth=auth)
if __name__ == "__main__":
ingest_data(patientDataFile="./test_data/patient.json", immunizationDataFile="./test_data/immunization.json")