diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_8_0/4570-add-support-for-warning-messages-to-the-batch-framework.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_8_0/4570-add-support-for-warning-messages-to-the-batch-framework.yaml new file mode 100644 index 00000000000..d5877a97dd8 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_8_0/4570-add-support-for-warning-messages-to-the-batch-framework.yaml @@ -0,0 +1,4 @@ +--- +type: add +issue: 4570 +title: "The batch2 framework now supports warning messages." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JobInstanceUtil.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JobInstanceUtil.java index 723b9256b4a..97f5ce63912 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JobInstanceUtil.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JobInstanceUtil.java @@ -60,6 +60,7 @@ class JobInstanceUtil { retVal.setCurrentGatedStepId(theEntity.getCurrentGatedStepId()); retVal.setReport(theEntity.getReport()); retVal.setEstimatedTimeRemaining(theEntity.getEstimatedTimeRemaining()); + retVal.setWarningMessages(theEntity.getWarningMessages()); return retVal; } @@ -91,6 +92,7 @@ class JobInstanceUtil { theJobInstanceEntity.setCurrentGatedStepId(theJobInstance.getCurrentGatedStepId()); theJobInstanceEntity.setReport(theJobInstance.getReport()); theJobInstanceEntity.setEstimatedTimeRemaining(theJobInstance.getEstimatedTimeRemaining()); + theJobInstanceEntity.setWarningMessages(theJobInstance.getWarningMessages()); } /** @@ -118,6 +120,7 @@ class JobInstanceUtil { retVal.setRecordsProcessed(theEntity.getRecordsProcessed()); // note: may be null out if queried NoData retVal.setData(theEntity.getSerializedData()); + retVal.setWarningMessage(theEntity.getWarningMessage()); return retVal; } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java index 2f132415f62..647036f7319 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java @@ -285,7 +285,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence { @Override @Transactional public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) { - myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(theEvent.getChunkId(), new Date(), theEvent.getRecordsProcessed(), theEvent.getRecoveredErrorCount(), WorkChunkStatusEnum.COMPLETED); + myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(theEvent.getChunkId(), new Date(), theEvent.getRecordsProcessed(), theEvent.getRecoveredErrorCount(), WorkChunkStatusEnum.COMPLETED, theEvent.getRecoveredWarningMessage()); } @Nullable diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java index 47dd72ad6af..65dd6982f2d 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java @@ -47,7 +47,7 @@ public interface IBatch2WorkChunkRepository extends JpaRepository fetchChunksNoData(Pageable thePageRequest, @Param("instanceId") String theInstanceId); @@ -59,10 +59,10 @@ public interface IBatch2WorkChunkRepository extends JpaRepository { protected void init680() { Builder version = forVersion(VersionEnum.V6_8_0); - // HAPI-FHIR #4801 - Add New Index On HFJ_RESOURCE - Builder.BuilderWithTableName resourceTable = version.onTable("HFJ_RESOURCE"); + // HAPI-FHIR #4801 - Add New Index On HFJ_RESOURCE + Builder.BuilderWithTableName resourceTable = version.onTable("HFJ_RESOURCE"); - resourceTable - .addIndex("20230502.1", "IDX_RES_RESID_UPDATED") - .unique(false) - .online(true) - .withColumns("RES_ID", "RES_UPDATED", "PARTITION_ID"); + resourceTable + .addIndex("20230502.1", "IDX_RES_RESID_UPDATED") + .unique(false) + .online(true) + .withColumns("RES_ID", "RES_UPDATED", "PARTITION_ID"); Builder.BuilderWithTableName tagDefTable = version.onTable("HFJ_TAG_DEF"); tagDefTable.dropIndex("20230505.1", "IDX_TAGDEF_TYPESYSCODEVERUS"); @@ -158,6 +157,18 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks { .onTable("HFJ_RES_VER_PROV") .dropIndex("20230523.1", "IDX_RESVERPROV_RESVER_PID"); + // add warning message to batch job instance + version + .onTable("BT2_WORK_CHUNK") + .addColumn("20230524.1", "WARNING_MSG") + .nullable() + .type(ColumnTypeEnum.CLOB); + + version + .onTable("BT2_JOB_INSTANCE") + .addColumn("20230524.2", "WARNING_MSG") + .nullable() + .type(ColumnTypeEnum.CLOB); } protected void init660() { @@ -218,12 +229,12 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks { .type(ColumnTypeEnum.STRING, UUID_LENGTH); - Builder.BuilderAddTableByColumns resSearchUrlTable = version.addTableByColumns("20230227.1","HFJ_RES_SEARCH_URL", "RES_SEARCH_URL"); + Builder.BuilderAddTableByColumns resSearchUrlTable = version.addTableByColumns("20230227.1", "HFJ_RES_SEARCH_URL", "RES_SEARCH_URL"); - resSearchUrlTable.addColumn( "RES_SEARCH_URL").nonNullable().type(ColumnTypeEnum.STRING, 768); - resSearchUrlTable.addColumn( "RES_ID").nonNullable().type(ColumnTypeEnum.LONG); + resSearchUrlTable.addColumn("RES_SEARCH_URL").nonNullable().type(ColumnTypeEnum.STRING, 768); + resSearchUrlTable.addColumn("RES_ID").nonNullable().type(ColumnTypeEnum.LONG); - resSearchUrlTable.addColumn( "CREATED_TIME").nonNullable().type(ColumnTypeEnum.DATE_TIMESTAMP); + resSearchUrlTable.addColumn("CREATED_TIME").nonNullable().type(ColumnTypeEnum.DATE_TIMESTAMP); resSearchUrlTable.addIndex("20230227.2", "IDX_RESSEARCHURL_RES").unique(false).withColumns("RES_ID"); resSearchUrlTable.addIndex("20230227.3", "IDX_RESSEARCHURL_TIME").unique(false).withColumns("CREATED_TIME"); @@ -317,16 +328,16 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks { version .onTable(enversMpiLinkAuditTable) - .addColumn("20230316.4", "PARTITION_DATE") + .addColumn("20230316.4", "PARTITION_DATE") .nullable() .type(ColumnTypeEnum.DATE_ONLY); } - version - .onTable(ResourceTable.HFJ_RESOURCE) - .addColumn("20230323.1", "SEARCH_URL_PRESENT") - .nullable() - .type(ColumnTypeEnum.BOOLEAN); + version + .onTable(ResourceTable.HFJ_RESOURCE) + .addColumn("20230323.1", "SEARCH_URL_PRESENT") + .nullable() + .type(ColumnTypeEnum.BOOLEAN); { @@ -335,12 +346,12 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks { .addIndex("20230324.1", "IDX_SP_URI_HASH_URI_V2") .unique(true) .online(true) - .withColumns("HASH_URI","RES_ID","PARTITION_ID"); + .withColumns("HASH_URI", "RES_ID", "PARTITION_ID"); uriTable .addIndex("20230324.2", "IDX_SP_URI_HASH_IDENTITY_V2") .unique(true) .online(true) - .withColumns("HASH_IDENTITY","SP_URI","RES_ID","PARTITION_ID"); + .withColumns("HASH_IDENTITY", "SP_URI", "RES_ID", "PARTITION_ID"); uriTable.dropIndex("20230324.3", "IDX_SP_URI_RESTYPE_NAME"); uriTable.dropIndex("20230324.4", "IDX_SP_URI_UPDATED"); uriTable.dropIndex("20230324.5", "IDX_SP_URI"); @@ -382,7 +393,7 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks { .addIndex("20230424.1", "IDX_RL_TGT_v2") .unique(false) .online(true) - .withColumns("TARGET_RESOURCE_ID", "SRC_PATH", "SRC_RESOURCE_ID", "TARGET_RESOURCE_TYPE","PARTITION_ID"); + .withColumns("TARGET_RESOURCE_ID", "SRC_PATH", "SRC_RESOURCE_ID", "TARGET_RESOURCE_TYPE", "PARTITION_ID"); // drop and recreate FK_SPIDXSTR_RESOURCE since it will be useing the old IDX_SP_STRING_RESID linkTable.dropForeignKey("20230424.2", "FK_RESLINK_TARGET", "HFJ_RESOURCE"); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java index b8218dff168..3db730155bc 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java @@ -10,6 +10,7 @@ import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor; import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.dao.ReindexParameters; +import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable; import ca.uhn.fhir.jpa.model.entity.ResourceTable; @@ -367,6 +368,23 @@ public class ReindexJobTest extends BaseJpaR4Test { assertThat(myReindexTestHelper.getAlleleObservationIds(), hasSize(50)); } + @Test + public void testReindex_DuplicateResourceBeforeEnforceUniqueShouldSaveWarning() { + myReindexTestHelper.createObservationWithCode(); + myReindexTestHelper.createObservationWithCode(); + + DaoMethodOutcome searchParameter = myReindexTestHelper.createUniqueCodeSearchParameter(); + + JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); + startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setParameters(new ReindexJobParameters()); + Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(startRequest); + JobInstance myJob = myBatch2JobHelper.awaitJobCompletion(startResponse); + + assertEquals(StatusEnum.COMPLETED, myJob.getStatus()); + assertNotNull(myJob.getWarningMessages()); + assertTrue(myJob.getWarningMessages().contains("Failed to reindex resource because unique search parameter " + searchParameter.getEntity().getIdDt().toVersionless().toString())); + } @Test public void testReindex_ExceptionThrownDuringWrite() { diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexTestHelper.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexTestHelper.java index 220760d65e5..a556bda3e08 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexTestHelper.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexTestHelper.java @@ -15,7 +15,11 @@ import ca.uhn.fhir.util.BundleUtil; import org.hl7.fhir.instance.model.api.IBaseBundle; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; +import org.hl7.fhir.r4.model.BooleanType; +import org.hl7.fhir.r4.model.CodeableConcept; +import org.hl7.fhir.r4.model.Coding; import org.hl7.fhir.r4.model.Enumerations; +import org.hl7.fhir.r4.model.Extension; import org.hl7.fhir.r4.model.Observation; import org.hl7.fhir.r4.model.Patient; import org.hl7.fhir.r4.model.SearchParameter; @@ -103,6 +107,36 @@ public class ReindexTestHelper { return daoMethodOutcome; } + public DaoMethodOutcome createUniqueCodeSearchParameter() { + createCodeSearchParameter(); + SearchParameter uniqueCodeSp = new SearchParameter(); + uniqueCodeSp.setId("SearchParameter/unique-code"); + uniqueCodeSp.addExtension(new Extension().setUrl("http://hapifhir.io/fhir/StructureDefinition/sp-unique").setValue(new BooleanType(true))); + uniqueCodeSp.setStatus(Enumerations.PublicationStatus.ACTIVE); + uniqueCodeSp.setCode("observation-code"); + uniqueCodeSp.addBase("Observation"); + uniqueCodeSp.setType(Enumerations.SearchParamType.COMPOSITE); + uniqueCodeSp.setExpression("Observation"); + uniqueCodeSp.addComponent(new SearchParameter.SearchParameterComponentComponent().setDefinition("SearchParameter/clinical-code").setExpression("Observation")); + + DaoMethodOutcome daoMethodOutcome = mySearchParameterDao.update(uniqueCodeSp); + mySearchParamRegistry.forceRefresh(); + return daoMethodOutcome; + } + + public DaoMethodOutcome createCodeSearchParameter() { + SearchParameter codeSp = new SearchParameter(); + codeSp.setId("SearchParameter/clinical-code"); + codeSp.setStatus(Enumerations.PublicationStatus.ACTIVE); + codeSp.setCode("code"); + codeSp.addBase("Observation"); + codeSp.setType(Enumerations.SearchParamType.TOKEN); + codeSp.setExpression("Observation.code"); + + DaoMethodOutcome daoMethodOutcome = mySearchParameterDao.update(codeSp); + mySearchParamRegistry.forceRefresh(); + return daoMethodOutcome; + } public IIdType createObservationWithAlleleExtension(Observation.ObservationStatus theStatus) { Observation observation = buildObservationWithAlleleExtension(theStatus); @@ -117,6 +151,19 @@ public class ReindexTestHelper { return observation; } + public IIdType createObservationWithCode() { + Observation observation = buildObservationWithCode(); + return myObservationDao.create(observation).getId(); + } + + public Observation buildObservationWithCode() { + Observation observation = new Observation(); + CodeableConcept codeableConcept = new CodeableConcept(); + codeableConcept.addCoding(new Coding().setCode("29463-7").setSystem("http://loinc.org").setDisplay("Body Weight")); + observation.setCode(codeableConcept); + return observation; + } + public List getEyeColourPatientIds() { return getEyeColourPatientIds(EYECOLOUR_SP_CODE, null); } diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStep.java index a7b37ce4989..f3601ce4034 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStep.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/reindex/ReindexStep.java @@ -19,9 +19,18 @@ */ package ca.uhn.fhir.batch2.jobs.reindex; -import ca.uhn.fhir.batch2.api.*; +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.IJobStepWorker; +import ca.uhn.fhir.batch2.api.JobExecutionFailedException; +import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; +import ca.uhn.fhir.batch2.api.VoidModel; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; -import ca.uhn.fhir.jpa.api.dao.*; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao; +import ca.uhn.fhir.jpa.api.dao.ReindexOutcome; +import ca.uhn.fhir.jpa.api.dao.ReindexParameters; import ca.uhn.fhir.jpa.api.svc.IIdHelperService; import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; import ca.uhn.fhir.parser.DataFormatException; @@ -98,6 +107,7 @@ public class ReindexStep implements IJobStepWorker { */ void recoveredError(String theMessage); + /** + * Step workers may invoke this method to indicate that a warning message processor + * + * @param theWarningProcessor The processor for the warning. + */ + void setWarningProcessor(IWarningProcessor theWarningProcessor); + } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobInstance.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobInstance.java index 7414c2e801e..38038abb342 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobInstance.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobInstance.java @@ -54,6 +54,8 @@ public interface IJobInstance { String getErrorMessage(); + String getWarningMessages(); + boolean isCancelled(); String getReport(); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java index 88dfce405a3..be15f3aeec3 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IJobPersistence.java @@ -76,6 +76,7 @@ public interface IJobPersistence extends IWorkChunkPersistence { /** * Fetches any existing jobs matching provided request parameters + * */ // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) List fetchInstances(FetchJobInstancesRequest theRequest, int theStart, int theBatchSize); @@ -97,19 +98,20 @@ public interface IJobPersistence extends IWorkChunkPersistence { /** * Fetch all job instances for a given job definition id + * */ // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) List fetchInstancesByJobDefinitionId(String theJobDefinitionId, int theCount, int theStart); /** * Fetches all job instances based on the JobFetchRequest + * * @param theRequest - the job fetch request * @return - a page of job instances */ // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) Page fetchJobInstances(JobInstanceFetchRequest theRequest); - // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId); @@ -130,6 +132,7 @@ public interface IJobPersistence extends IWorkChunkPersistence { /** * Fetch all chunks for a given instance. + * * @param theInstanceId - instance id * @param theWithData - whether or not to include the data * @return - an iterator for fetching work chunks @@ -233,8 +236,6 @@ public interface IJobPersistence extends IWorkChunkPersistence { @Transactional(propagation = Propagation.MANDATORY) void updateInstanceUpdateTime(String theInstanceId); - - /* * State transition events for job instances. * These cause the transitions along {@link ca.uhn.fhir.batch2.model.StatusEnum} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IWarningProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IWarningProcessor.java new file mode 100644 index 00000000000..4a9b7166d42 --- /dev/null +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/api/IWarningProcessor.java @@ -0,0 +1,14 @@ +package ca.uhn.fhir.batch2.api; + +public interface IWarningProcessor { + + /** + * Data Sink may invoke this method to indicate that an error occurred during + * processing in work chunks but that it is non-fatal and should be saved as a warning. + * + * @param theErrorMessage An error message to be processed. + */ + public void recoverWarningMessage(String theErrorMessage); + + public String getRecoveredWarningMessage(); +} diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/BaseDataSink.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/BaseDataSink.java index 11f15076678..1a709663bed 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/BaseDataSink.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/BaseDataSink.java @@ -20,22 +20,24 @@ package ca.uhn.fhir.batch2.coordinator; import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.IWarningProcessor; import ca.uhn.fhir.batch2.model.JobDefinitionStep; import ca.uhn.fhir.batch2.model.JobWorkCursor; -import ca.uhn.fhir.util.Logs; import ca.uhn.fhir.model.api.IModelJson; +import ca.uhn.fhir.util.Logs; import org.slf4j.Logger; abstract class BaseDataSink implements IJobDataSink { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); private final String myInstanceId; - private final JobWorkCursor myJobWorkCursor; + private final JobWorkCursor myJobWorkCursor; private int myRecoveredErrorCount; protected final String myJobDefinitionId; + private IWarningProcessor myWarningProcessor; protected BaseDataSink(String theInstanceId, - JobWorkCursor theJobWorkCursor) { + JobWorkCursor theJobWorkCursor) { myInstanceId = theInstanceId; myJobWorkCursor = theJobWorkCursor; myJobDefinitionId = theJobWorkCursor.getJobDefinition().getJobDefinitionId(); @@ -48,13 +50,27 @@ abstract class BaseDataSink getTargetStep() { + public JobDefinitionStep getTargetStep() { return myJobWorkCursor.currentStep; } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/StepExecutor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/StepExecutor.java index 540ed474900..585e361bab6 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/StepExecutor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/StepExecutor.java @@ -91,8 +91,8 @@ public class StepExecutor { if (theStepExecutionDetails.hasAssociatedWorkChunk()) { int recordsProcessed = outcome.getRecordsProcessed(); int recoveredErrorCount = theDataSink.getRecoveredErrorCount(); + WorkChunkCompletionEvent event = new WorkChunkCompletionEvent(chunkId, recordsProcessed, recoveredErrorCount, theDataSink.getRecoveredWarning()); - WorkChunkCompletionEvent event = new WorkChunkCompletionEvent(chunkId, recordsProcessed, recoveredErrorCount); myJobPersistence.onWorkChunkCompletion(event); } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobInstance.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobInstance.java index d34f2497a06..e339cea80e1 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobInstance.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/JobInstance.java @@ -105,9 +105,10 @@ public class JobInstance implements IModelJson, IJobInstance { private int myErrorCount; @JsonProperty(value = "estimatedCompletion", access = JsonProperty.Access.READ_ONLY) private String myEstimatedTimeRemaining; - @JsonProperty(value = "report", access = JsonProperty.Access.READ_WRITE) private String myReport; + @JsonProperty(value = "warningMessages", access = JsonProperty.Access.READ_ONLY) + private String myWarningMessages; /** * Constructor @@ -141,6 +142,7 @@ public class JobInstance implements IModelJson, IJobInstance { setWorkChunksPurged(theJobInstance.isWorkChunksPurged()); setCurrentGatedStepId(theJobInstance.getCurrentGatedStepId()); setReport(theJobInstance.getReport()); + setWarningMessages(theJobInstance.getWarningMessages()); } @@ -336,6 +338,14 @@ public class JobInstance implements IModelJson, IJobInstance { return this; } + public String getWarningMessages() { + return myWarningMessages; + } + + public JobInstance setWarningMessages(String theWarningMessages) { + myWarningMessages = theWarningMessages; + return this; + } public void setJobDefinition(JobDefinition theJobDefinition) { setJobDefinitionId(theJobDefinition.getJobDefinitionId()); setJobDefinitionVersion(theJobDefinition.getJobDefinitionVersion()); @@ -379,6 +389,7 @@ public class JobInstance implements IModelJson, IJobInstance { .append("errorCount", myErrorCount) .append("estimatedTimeRemaining", myEstimatedTimeRemaining) .append("report", myReport) + .append("warningMessages", myWarningMessages) .toString(); } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunk.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunk.java index 50a3b8973c0..0d888ad1fb6 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunk.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunk.java @@ -94,6 +94,9 @@ public class WorkChunk implements IModelJson { @JsonProperty(value = "errorCount", access = JsonProperty.Access.READ_ONLY) private int myErrorCount; + @JsonProperty(value = "warningMessage", access = JsonProperty.Access.READ_ONLY) + private String myWarningMessage; + /** * Constructor */ @@ -246,6 +249,15 @@ public class WorkChunk implements IModelJson { myUpdateTime = theUpdateTime; } + public String getWarningMessage() { + return myWarningMessage; + } + + public WorkChunk setWarningMessage(String theWarningMessage) { + myWarningMessage = theWarningMessage; + return this; + } + @Override public String toString() { ToStringBuilder b = new ToStringBuilder(this); @@ -268,6 +280,9 @@ public class WorkChunk implements IModelJson { if (myErrorCount > 0) { b.append("ErrorCount", myErrorCount); } + if (isNotBlank(myWarningMessage)) { + b.append("WarningMessage", myWarningMessage); + } return b.toString(); } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkCompletionEvent.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkCompletionEvent.java index ffd578d5c99..1f1100784a3 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkCompletionEvent.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/model/WorkChunkCompletionEvent.java @@ -28,6 +28,7 @@ import org.apache.commons.lang3.builder.HashCodeBuilder; public class WorkChunkCompletionEvent extends BaseWorkChunkEvent { int myRecordsProcessed; int myRecoveredErrorCount; + String myRecoveredWarningMessage; public WorkChunkCompletionEvent(String theChunkId, int theRecordsProcessed, int theRecoveredErrorCount) { super(theChunkId); @@ -35,6 +36,11 @@ public class WorkChunkCompletionEvent extends BaseWorkChunkEvent { myRecoveredErrorCount = theRecoveredErrorCount; } + public WorkChunkCompletionEvent(String theChunkId, int theRecordsProcessed, int theRecoveredErrorCount, String theRecoveredWarningMessage) { + this(theChunkId, theRecordsProcessed, theRecoveredErrorCount); + myRecoveredWarningMessage = theRecoveredWarningMessage; + } + public int getRecordsProcessed() { return myRecordsProcessed; } @@ -43,6 +49,10 @@ public class WorkChunkCompletionEvent extends BaseWorkChunkEvent { return myRecoveredErrorCount; } + public String getRecoveredWarningMessage() { + return myRecoveredWarningMessage; + } + @Override public boolean equals(Object theO) { if (this == theO) return true; @@ -51,11 +61,11 @@ public class WorkChunkCompletionEvent extends BaseWorkChunkEvent { WorkChunkCompletionEvent that = (WorkChunkCompletionEvent) theO; - return new EqualsBuilder().appendSuper(super.equals(theO)).append(myRecordsProcessed, that.myRecordsProcessed).append(myRecoveredErrorCount, that.myRecoveredErrorCount).isEquals(); + return new EqualsBuilder().appendSuper(super.equals(theO)).append(myRecordsProcessed, that.myRecordsProcessed).append(myRecoveredErrorCount, that.myRecoveredErrorCount).append(myRecoveredWarningMessage, that.myRecoveredWarningMessage).isEquals(); } @Override public int hashCode() { - return new HashCodeBuilder(17, 37).appendSuper(super.hashCode()).append(myRecordsProcessed).append(myRecoveredErrorCount).toHashCode(); + return new HashCodeBuilder(17, 37).appendSuper(super.hashCode()).append(myRecordsProcessed).append(myRecoveredErrorCount).append(myRecoveredWarningMessage).toHashCode(); } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java index a90e9a7d256..6d8b5e631b8 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java @@ -30,7 +30,9 @@ import org.slf4j.Logger; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; public class InstanceProgress { @@ -50,10 +52,13 @@ public class InstanceProgress { private String myErrormessage = null; private StatusEnum myNewStatus = null; private final Map> myStepToStatusCountMap = new HashMap<>(); + private final Set myWarningMessages = new HashSet<>(); public void addChunk(WorkChunk theChunk) { myErrorCountForAllStatuses += theChunk.getErrorCount(); - + if (theChunk.getWarningMessage() != null) { + myWarningMessages.add(theChunk.getWarningMessage()); + } updateRecordsProcessed(theChunk); updateEarliestTime(theChunk); updateLatestEndTime(theChunk); @@ -119,6 +124,9 @@ public class InstanceProgress { public void updateInstance(JobInstance theInstance) { updateInstance(theInstance, false); + + String newWarningMessage = String.join("\n", myWarningMessages); + theInstance.setWarningMessages(newWarningMessage); } /**