4570 add support for warning messages to the batch framework (#4571)

* added failing test

* added support for warning messages to batch2

* added change log

* fixed errors from merge, made warning work with new batch2

* fixed message parse, changed interface method to default, formatting changes

* added changelog

* code review changes

* code review change

* added new implementation for warning message processing, fixed migration

* changed column to lob, as text is not well supported

* code review changes

---------

Co-authored-by: Steven Li <steven@smilecdr.com>
This commit is contained in:
StevenXLi 2023-06-01 13:16:44 -04:00 committed by GitHub
parent 6c314edee9
commit 83411e585b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 265 additions and 42 deletions

View File

@ -0,0 +1,4 @@
---
type: add
issue: 4570
title: "The batch2 framework now supports warning messages."

View File

@ -60,6 +60,7 @@ class JobInstanceUtil {
retVal.setCurrentGatedStepId(theEntity.getCurrentGatedStepId()); retVal.setCurrentGatedStepId(theEntity.getCurrentGatedStepId());
retVal.setReport(theEntity.getReport()); retVal.setReport(theEntity.getReport());
retVal.setEstimatedTimeRemaining(theEntity.getEstimatedTimeRemaining()); retVal.setEstimatedTimeRemaining(theEntity.getEstimatedTimeRemaining());
retVal.setWarningMessages(theEntity.getWarningMessages());
return retVal; return retVal;
} }
@ -91,6 +92,7 @@ class JobInstanceUtil {
theJobInstanceEntity.setCurrentGatedStepId(theJobInstance.getCurrentGatedStepId()); theJobInstanceEntity.setCurrentGatedStepId(theJobInstance.getCurrentGatedStepId());
theJobInstanceEntity.setReport(theJobInstance.getReport()); theJobInstanceEntity.setReport(theJobInstance.getReport());
theJobInstanceEntity.setEstimatedTimeRemaining(theJobInstance.getEstimatedTimeRemaining()); theJobInstanceEntity.setEstimatedTimeRemaining(theJobInstance.getEstimatedTimeRemaining());
theJobInstanceEntity.setWarningMessages(theJobInstance.getWarningMessages());
} }
/** /**
@ -118,6 +120,7 @@ class JobInstanceUtil {
retVal.setRecordsProcessed(theEntity.getRecordsProcessed()); retVal.setRecordsProcessed(theEntity.getRecordsProcessed());
// note: may be null out if queried NoData // note: may be null out if queried NoData
retVal.setData(theEntity.getSerializedData()); retVal.setData(theEntity.getSerializedData());
retVal.setWarningMessage(theEntity.getWarningMessage());
return retVal; return retVal;
} }
} }

View File

@ -285,7 +285,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override @Override
@Transactional @Transactional
public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) { public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) {
myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(theEvent.getChunkId(), new Date(), theEvent.getRecordsProcessed(), theEvent.getRecoveredErrorCount(), WorkChunkStatusEnum.COMPLETED); myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(theEvent.getChunkId(), new Date(), theEvent.getRecordsProcessed(), theEvent.getRecoveredErrorCount(), WorkChunkStatusEnum.COMPLETED, theEvent.getRecoveredWarningMessage());
} }
@Nullable @Nullable

View File

@ -47,7 +47,7 @@ public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChun
@Query("SELECT new Batch2WorkChunkEntity(" + @Query("SELECT new Batch2WorkChunkEntity(" +
"e.myId, e.mySequence, e.myJobDefinitionId, e.myJobDefinitionVersion, e.myInstanceId, e.myTargetStepId, e.myStatus," + "e.myId, e.mySequence, e.myJobDefinitionId, e.myJobDefinitionVersion, e.myInstanceId, e.myTargetStepId, e.myStatus," +
"e.myCreateTime, e.myStartTime, e.myUpdateTime, e.myEndTime," + "e.myCreateTime, e.myStartTime, e.myUpdateTime, e.myEndTime," +
"e.myErrorMessage, e.myErrorCount, e.myRecordsProcessed" + "e.myErrorMessage, e.myErrorCount, e.myRecordsProcessed, e.myWarningMessage" +
") FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC, e.myId ASC") ") FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC, e.myId ASC")
List<Batch2WorkChunkEntity> fetchChunksNoData(Pageable thePageRequest, @Param("instanceId") String theInstanceId); List<Batch2WorkChunkEntity> fetchChunksNoData(Pageable thePageRequest, @Param("instanceId") String theInstanceId);
@ -59,10 +59,10 @@ public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChun
@Modifying @Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, " + @Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, " +
"e.myRecordsProcessed = :rp, e.myErrorCount = e.myErrorCount + :errorRetries, e.mySerializedData = null " + "e.myRecordsProcessed = :rp, e.myErrorCount = e.myErrorCount + :errorRetries, e.mySerializedData = null, " +
"WHERE e.myId = :id") "e.myWarningMessage = :warningMessage WHERE e.myId = :id")
void updateChunkStatusAndClearDataForEndSuccess(@Param("id") String theChunkId, @Param("et") Date theEndTime, void updateChunkStatusAndClearDataForEndSuccess(@Param("id") String theChunkId, @Param("et") Date theEndTime,
@Param("rp") int theRecordsProcessed, @Param("errorRetries") int theErrorRetries, @Param("status") WorkChunkStatusEnum theInProgress); @Param("rp") int theRecordsProcessed, @Param("errorRetries") int theErrorRetries, @Param("status") WorkChunkStatusEnum theInProgress, @Param("warningMessage") String theWarningMessage);
@Modifying @Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.mySerializedData = null, e.myErrorMessage = :em WHERE e.myId IN(:ids)") @Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.mySerializedData = null, e.myErrorMessage = :em WHERE e.myId IN(:ids)")

View File

@ -113,6 +113,9 @@ public class Batch2JobInstanceEntity implements Serializable {
private String myEstimatedTimeRemaining; private String myEstimatedTimeRemaining;
@Column(name = "CUR_GATED_STEP_ID", length = ID_MAX_LENGTH, nullable = true) @Column(name = "CUR_GATED_STEP_ID", length = ID_MAX_LENGTH, nullable = true)
private String myCurrentGatedStepId; private String myCurrentGatedStepId;
@Lob
@Column(name = "WARNING_MSG", nullable = true)
private String myWarningMessages;
/** /**
* Any output from the job can be held in this column * Any output from the job can be held in this column
@ -292,6 +295,14 @@ public class Batch2JobInstanceEntity implements Serializable {
myReport = theReport; myReport = theReport;
} }
public String getWarningMessages() {
return myWarningMessages;
}
public void setWarningMessages(String theWarningMessages) {
myWarningMessages = theWarningMessages;
}
@Override @Override
public String toString() { public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
@ -313,6 +324,7 @@ public class Batch2JobInstanceEntity implements Serializable {
.append("errorMessage", myErrorMessage) .append("errorMessage", myErrorMessage)
.append("estimatedTimeRemaining", myEstimatedTimeRemaining) .append("estimatedTimeRemaining", myEstimatedTimeRemaining)
.append("report", myReport) .append("report", myReport)
.append("warningMessages", myWarningMessages)
.toString(); .toString();
} }

View File

@ -96,7 +96,9 @@ public class Batch2WorkChunkEntity implements Serializable {
private String myErrorMessage; private String myErrorMessage;
@Column(name = "ERROR_COUNT", nullable = false) @Column(name = "ERROR_COUNT", nullable = false)
private int myErrorCount; private int myErrorCount;
@Lob
@Column(name = "WARNING_MSG", nullable = true)
private String myWarningMessage;
/** /**
* Default constructor for Hibernate. * Default constructor for Hibernate.
@ -110,7 +112,7 @@ public class Batch2WorkChunkEntity implements Serializable {
public Batch2WorkChunkEntity(String theId, int theSequence, String theJobDefinitionId, int theJobDefinitionVersion, public Batch2WorkChunkEntity(String theId, int theSequence, String theJobDefinitionId, int theJobDefinitionVersion,
String theInstanceId, String theTargetStepId, WorkChunkStatusEnum theStatus, String theInstanceId, String theTargetStepId, WorkChunkStatusEnum theStatus,
Date theCreateTime, Date theStartTime, Date theUpdateTime, Date theEndTime, Date theCreateTime, Date theStartTime, Date theUpdateTime, Date theEndTime,
String theErrorMessage, int theErrorCount, Integer theRecordsProcessed) { String theErrorMessage, int theErrorCount, Integer theRecordsProcessed, String theWarningMessage) {
myId = theId; myId = theId;
mySequence = theSequence; mySequence = theSequence;
myJobDefinitionId = theJobDefinitionId; myJobDefinitionId = theJobDefinitionId;
@ -125,6 +127,7 @@ public class Batch2WorkChunkEntity implements Serializable {
myErrorMessage = theErrorMessage; myErrorMessage = theErrorMessage;
myErrorCount = theErrorCount; myErrorCount = theErrorCount;
myRecordsProcessed = theRecordsProcessed; myRecordsProcessed = theRecordsProcessed;
myWarningMessage = theWarningMessage;
} }
public int getErrorCount() { public int getErrorCount() {
@ -143,6 +146,14 @@ public class Batch2WorkChunkEntity implements Serializable {
myErrorMessage = left(theErrorMessage, ERROR_MSG_MAX_LENGTH); myErrorMessage = left(theErrorMessage, ERROR_MSG_MAX_LENGTH);
} }
public String getWarningMessage() {
return myWarningMessage;
}
public void setWarningMessage(String theWarningMessage) {
myWarningMessage = theWarningMessage;
}
public int getSequence() { public int getSequence() {
return mySequence; return mySequence;
} }
@ -269,6 +280,7 @@ public class Batch2WorkChunkEntity implements Serializable {
.append("serializedData", mySerializedData) .append("serializedData", mySerializedData)
.append("status", myStatus) .append("status", myStatus)
.append("errorMessage", myErrorMessage) .append("errorMessage", myErrorMessage)
.append("warningMessage", myWarningMessage)
.toString(); .toString();
} }

View File

@ -45,7 +45,6 @@ import ca.uhn.fhir.util.ClasspathUtil;
import ca.uhn.fhir.util.VersionEnum; import ca.uhn.fhir.util.VersionEnum;
import software.amazon.awssdk.utils.StringUtils; import software.amazon.awssdk.utils.StringUtils;
import javax.persistence.Index;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -103,14 +102,14 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
protected void init680() { protected void init680() {
Builder version = forVersion(VersionEnum.V6_8_0); Builder version = forVersion(VersionEnum.V6_8_0);
// HAPI-FHIR #4801 - Add New Index On HFJ_RESOURCE // HAPI-FHIR #4801 - Add New Index On HFJ_RESOURCE
Builder.BuilderWithTableName resourceTable = version.onTable("HFJ_RESOURCE"); Builder.BuilderWithTableName resourceTable = version.onTable("HFJ_RESOURCE");
resourceTable resourceTable
.addIndex("20230502.1", "IDX_RES_RESID_UPDATED") .addIndex("20230502.1", "IDX_RES_RESID_UPDATED")
.unique(false) .unique(false)
.online(true) .online(true)
.withColumns("RES_ID", "RES_UPDATED", "PARTITION_ID"); .withColumns("RES_ID", "RES_UPDATED", "PARTITION_ID");
Builder.BuilderWithTableName tagDefTable = version.onTable("HFJ_TAG_DEF"); Builder.BuilderWithTableName tagDefTable = version.onTable("HFJ_TAG_DEF");
tagDefTable.dropIndex("20230505.1", "IDX_TAGDEF_TYPESYSCODEVERUS"); tagDefTable.dropIndex("20230505.1", "IDX_TAGDEF_TYPESYSCODEVERUS");
@ -158,6 +157,18 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
.onTable("HFJ_RES_VER_PROV") .onTable("HFJ_RES_VER_PROV")
.dropIndex("20230523.1", "IDX_RESVERPROV_RESVER_PID"); .dropIndex("20230523.1", "IDX_RESVERPROV_RESVER_PID");
// add warning message to batch job instance
version
.onTable("BT2_WORK_CHUNK")
.addColumn("20230524.1", "WARNING_MSG")
.nullable()
.type(ColumnTypeEnum.CLOB);
version
.onTable("BT2_JOB_INSTANCE")
.addColumn("20230524.2", "WARNING_MSG")
.nullable()
.type(ColumnTypeEnum.CLOB);
} }
protected void init660() { protected void init660() {
@ -218,12 +229,12 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
.type(ColumnTypeEnum.STRING, UUID_LENGTH); .type(ColumnTypeEnum.STRING, UUID_LENGTH);
Builder.BuilderAddTableByColumns resSearchUrlTable = version.addTableByColumns("20230227.1","HFJ_RES_SEARCH_URL", "RES_SEARCH_URL"); Builder.BuilderAddTableByColumns resSearchUrlTable = version.addTableByColumns("20230227.1", "HFJ_RES_SEARCH_URL", "RES_SEARCH_URL");
resSearchUrlTable.addColumn( "RES_SEARCH_URL").nonNullable().type(ColumnTypeEnum.STRING, 768); resSearchUrlTable.addColumn("RES_SEARCH_URL").nonNullable().type(ColumnTypeEnum.STRING, 768);
resSearchUrlTable.addColumn( "RES_ID").nonNullable().type(ColumnTypeEnum.LONG); resSearchUrlTable.addColumn("RES_ID").nonNullable().type(ColumnTypeEnum.LONG);
resSearchUrlTable.addColumn( "CREATED_TIME").nonNullable().type(ColumnTypeEnum.DATE_TIMESTAMP); resSearchUrlTable.addColumn("CREATED_TIME").nonNullable().type(ColumnTypeEnum.DATE_TIMESTAMP);
resSearchUrlTable.addIndex("20230227.2", "IDX_RESSEARCHURL_RES").unique(false).withColumns("RES_ID"); resSearchUrlTable.addIndex("20230227.2", "IDX_RESSEARCHURL_RES").unique(false).withColumns("RES_ID");
resSearchUrlTable.addIndex("20230227.3", "IDX_RESSEARCHURL_TIME").unique(false).withColumns("CREATED_TIME"); resSearchUrlTable.addIndex("20230227.3", "IDX_RESSEARCHURL_TIME").unique(false).withColumns("CREATED_TIME");
@ -317,16 +328,16 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
version version
.onTable(enversMpiLinkAuditTable) .onTable(enversMpiLinkAuditTable)
.addColumn("20230316.4", "PARTITION_DATE") .addColumn("20230316.4", "PARTITION_DATE")
.nullable() .nullable()
.type(ColumnTypeEnum.DATE_ONLY); .type(ColumnTypeEnum.DATE_ONLY);
} }
version version
.onTable(ResourceTable.HFJ_RESOURCE) .onTable(ResourceTable.HFJ_RESOURCE)
.addColumn("20230323.1", "SEARCH_URL_PRESENT") .addColumn("20230323.1", "SEARCH_URL_PRESENT")
.nullable() .nullable()
.type(ColumnTypeEnum.BOOLEAN); .type(ColumnTypeEnum.BOOLEAN);
{ {
@ -335,12 +346,12 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
.addIndex("20230324.1", "IDX_SP_URI_HASH_URI_V2") .addIndex("20230324.1", "IDX_SP_URI_HASH_URI_V2")
.unique(true) .unique(true)
.online(true) .online(true)
.withColumns("HASH_URI","RES_ID","PARTITION_ID"); .withColumns("HASH_URI", "RES_ID", "PARTITION_ID");
uriTable uriTable
.addIndex("20230324.2", "IDX_SP_URI_HASH_IDENTITY_V2") .addIndex("20230324.2", "IDX_SP_URI_HASH_IDENTITY_V2")
.unique(true) .unique(true)
.online(true) .online(true)
.withColumns("HASH_IDENTITY","SP_URI","RES_ID","PARTITION_ID"); .withColumns("HASH_IDENTITY", "SP_URI", "RES_ID", "PARTITION_ID");
uriTable.dropIndex("20230324.3", "IDX_SP_URI_RESTYPE_NAME"); uriTable.dropIndex("20230324.3", "IDX_SP_URI_RESTYPE_NAME");
uriTable.dropIndex("20230324.4", "IDX_SP_URI_UPDATED"); uriTable.dropIndex("20230324.4", "IDX_SP_URI_UPDATED");
uriTable.dropIndex("20230324.5", "IDX_SP_URI"); uriTable.dropIndex("20230324.5", "IDX_SP_URI");
@ -382,7 +393,7 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
.addIndex("20230424.1", "IDX_RL_TGT_v2") .addIndex("20230424.1", "IDX_RL_TGT_v2")
.unique(false) .unique(false)
.online(true) .online(true)
.withColumns("TARGET_RESOURCE_ID", "SRC_PATH", "SRC_RESOURCE_ID", "TARGET_RESOURCE_TYPE","PARTITION_ID"); .withColumns("TARGET_RESOURCE_ID", "SRC_PATH", "SRC_RESOURCE_ID", "TARGET_RESOURCE_TYPE", "PARTITION_ID");
// drop and recreate FK_SPIDXSTR_RESOURCE since it will be useing the old IDX_SP_STRING_RESID // drop and recreate FK_SPIDXSTR_RESOURCE since it will be useing the old IDX_SP_STRING_RESID
linkTable.dropForeignKey("20230424.2", "FK_RESLINK_TARGET", "HFJ_RESOURCE"); linkTable.dropForeignKey("20230424.2", "FK_RESLINK_TARGET", "HFJ_RESOURCE");

View File

@ -10,6 +10,7 @@ import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters; import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable; import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.model.entity.ResourceTable;
@ -367,6 +368,23 @@ public class ReindexJobTest extends BaseJpaR4Test {
assertThat(myReindexTestHelper.getAlleleObservationIds(), hasSize(50)); assertThat(myReindexTestHelper.getAlleleObservationIds(), hasSize(50));
} }
@Test
public void testReindex_DuplicateResourceBeforeEnforceUniqueShouldSaveWarning() {
myReindexTestHelper.createObservationWithCode();
myReindexTestHelper.createObservationWithCode();
DaoMethodOutcome searchParameter = myReindexTestHelper.createUniqueCodeSearchParameter();
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(new ReindexJobParameters());
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(startRequest);
JobInstance myJob = myBatch2JobHelper.awaitJobCompletion(startResponse);
assertEquals(StatusEnum.COMPLETED, myJob.getStatus());
assertNotNull(myJob.getWarningMessages());
assertTrue(myJob.getWarningMessages().contains("Failed to reindex resource because unique search parameter " + searchParameter.getEntity().getIdDt().toVersionless().toString()));
}
@Test @Test
public void testReindex_ExceptionThrownDuringWrite() { public void testReindex_ExceptionThrownDuringWrite() {

View File

@ -15,7 +15,11 @@ import ca.uhn.fhir.util.BundleUtil;
import org.hl7.fhir.instance.model.api.IBaseBundle; import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.CodeableConcept;
import org.hl7.fhir.r4.model.Coding;
import org.hl7.fhir.r4.model.Enumerations; import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.Observation; import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient; import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.SearchParameter; import org.hl7.fhir.r4.model.SearchParameter;
@ -103,6 +107,36 @@ public class ReindexTestHelper {
return daoMethodOutcome; return daoMethodOutcome;
} }
public DaoMethodOutcome createUniqueCodeSearchParameter() {
createCodeSearchParameter();
SearchParameter uniqueCodeSp = new SearchParameter();
uniqueCodeSp.setId("SearchParameter/unique-code");
uniqueCodeSp.addExtension(new Extension().setUrl("http://hapifhir.io/fhir/StructureDefinition/sp-unique").setValue(new BooleanType(true)));
uniqueCodeSp.setStatus(Enumerations.PublicationStatus.ACTIVE);
uniqueCodeSp.setCode("observation-code");
uniqueCodeSp.addBase("Observation");
uniqueCodeSp.setType(Enumerations.SearchParamType.COMPOSITE);
uniqueCodeSp.setExpression("Observation");
uniqueCodeSp.addComponent(new SearchParameter.SearchParameterComponentComponent().setDefinition("SearchParameter/clinical-code").setExpression("Observation"));
DaoMethodOutcome daoMethodOutcome = mySearchParameterDao.update(uniqueCodeSp);
mySearchParamRegistry.forceRefresh();
return daoMethodOutcome;
}
public DaoMethodOutcome createCodeSearchParameter() {
SearchParameter codeSp = new SearchParameter();
codeSp.setId("SearchParameter/clinical-code");
codeSp.setStatus(Enumerations.PublicationStatus.ACTIVE);
codeSp.setCode("code");
codeSp.addBase("Observation");
codeSp.setType(Enumerations.SearchParamType.TOKEN);
codeSp.setExpression("Observation.code");
DaoMethodOutcome daoMethodOutcome = mySearchParameterDao.update(codeSp);
mySearchParamRegistry.forceRefresh();
return daoMethodOutcome;
}
public IIdType createObservationWithAlleleExtension(Observation.ObservationStatus theStatus) { public IIdType createObservationWithAlleleExtension(Observation.ObservationStatus theStatus) {
Observation observation = buildObservationWithAlleleExtension(theStatus); Observation observation = buildObservationWithAlleleExtension(theStatus);
@ -117,6 +151,19 @@ public class ReindexTestHelper {
return observation; return observation;
} }
public IIdType createObservationWithCode() {
Observation observation = buildObservationWithCode();
return myObservationDao.create(observation).getId();
}
public Observation buildObservationWithCode() {
Observation observation = new Observation();
CodeableConcept codeableConcept = new CodeableConcept();
codeableConcept.addCoding(new Coding().setCode("29463-7").setSystem("http://loinc.org").setDisplay("Body Weight"));
observation.setCode(codeableConcept);
return observation;
}
public List<String> getEyeColourPatientIds() { public List<String> getEyeColourPatientIds() {
return getEyeColourPatientIds(EYECOLOUR_SP_CODE, null); return getEyeColourPatientIds(EYECOLOUR_SP_CODE, null);
} }

View File

@ -19,9 +19,18 @@
*/ */
package ca.uhn.fhir.batch2.jobs.reindex; package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.api.*; import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.jpa.api.dao.*; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.dao.ReindexOutcome;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService; import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.parser.DataFormatException; import ca.uhn.fhir.parser.DataFormatException;
@ -98,6 +107,7 @@ public class ReindexStep implements IJobStepWorker<ReindexJobParameters, Resourc
myInstanceId = theInstanceId; myInstanceId = theInstanceId;
myChunkId = theChunkId; myChunkId = theChunkId;
myJobParameters = theJobParameters; myJobParameters = theJobParameters;
myDataSink.setWarningProcessor(new ReindexWarningProcessor());
} }
@Override @Override

View File

@ -0,0 +1,22 @@
package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.api.IWarningProcessor;
public class ReindexWarningProcessor implements IWarningProcessor {
private String myRecoveredWarning;
@Override
public void recoverWarningMessage(String theErrorMessage) {
// save non-fatal error as warning, current only support unique search param reindexing error on existing duplicates
if (theErrorMessage.contains("Can not create resource") && theErrorMessage.contains("it would create a duplicate unique index matching query")) {
String searchParamName = theErrorMessage.substring(theErrorMessage.indexOf("SearchParameter"), theErrorMessage.length() - 1);
myRecoveredWarning = "Failed to reindex resource because unique search parameter " + searchParamName + " could not be enforced.";
}
}
@Override
public String getRecoveredWarningMessage() {
return myRecoveredWarning;
}
}

View File

@ -68,4 +68,11 @@ public interface IJobDataSink<OT extends IModelJson> {
*/ */
void recoveredError(String theMessage); void recoveredError(String theMessage);
/**
* Step workers may invoke this method to indicate that a warning message processor
*
* @param theWarningProcessor The processor for the warning.
*/
void setWarningProcessor(IWarningProcessor theWarningProcessor);
} }

View File

@ -54,6 +54,8 @@ public interface IJobInstance {
String getErrorMessage(); String getErrorMessage();
String getWarningMessages();
boolean isCancelled(); boolean isCancelled();
String getReport(); String getReport();

View File

@ -76,6 +76,7 @@ public interface IJobPersistence extends IWorkChunkPersistence {
/** /**
* Fetches any existing jobs matching provided request parameters * Fetches any existing jobs matching provided request parameters
*
*/ */
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int theStart, int theBatchSize); List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int theStart, int theBatchSize);
@ -97,19 +98,20 @@ public interface IJobPersistence extends IWorkChunkPersistence {
/** /**
* Fetch all job instances for a given job definition id * Fetch all job instances for a given job definition id
*
*/ */
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
List<JobInstance> fetchInstancesByJobDefinitionId(String theJobDefinitionId, int theCount, int theStart); List<JobInstance> fetchInstancesByJobDefinitionId(String theJobDefinitionId, int theCount, int theStart);
/** /**
* Fetches all job instances based on the JobFetchRequest * Fetches all job instances based on the JobFetchRequest
*
* @param theRequest - the job fetch request * @param theRequest - the job fetch request
* @return - a page of job instances * @return - a page of job instances
*/ */
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest); Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest);
// on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId); boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId);
@ -130,6 +132,7 @@ public interface IJobPersistence extends IWorkChunkPersistence {
/** /**
* Fetch all chunks for a given instance. * Fetch all chunks for a given instance.
*
* @param theInstanceId - instance id * @param theInstanceId - instance id
* @param theWithData - whether or not to include the data * @param theWithData - whether or not to include the data
* @return - an iterator for fetching work chunks * @return - an iterator for fetching work chunks
@ -233,8 +236,6 @@ public interface IJobPersistence extends IWorkChunkPersistence {
@Transactional(propagation = Propagation.MANDATORY) @Transactional(propagation = Propagation.MANDATORY)
void updateInstanceUpdateTime(String theInstanceId); void updateInstanceUpdateTime(String theInstanceId);
/* /*
* State transition events for job instances. * State transition events for job instances.
* These cause the transitions along {@link ca.uhn.fhir.batch2.model.StatusEnum} * These cause the transitions along {@link ca.uhn.fhir.batch2.model.StatusEnum}

View File

@ -0,0 +1,14 @@
package ca.uhn.fhir.batch2.api;
public interface IWarningProcessor {
/**
* Data Sink may invoke this method to indicate that an error occurred during
* processing in work chunks but that it is non-fatal and should be saved as a warning.
*
* @param theErrorMessage An error message to be processed.
*/
public void recoverWarningMessage(String theErrorMessage);
public String getRecoveredWarningMessage();
}

View File

@ -20,22 +20,24 @@
package ca.uhn.fhir.batch2.coordinator; package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobDataSink; import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IWarningProcessor;
import ca.uhn.fhir.batch2.model.JobDefinitionStep; import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobWorkCursor; import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import org.slf4j.Logger; import org.slf4j.Logger;
abstract class BaseDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> implements IJobDataSink<OT> { abstract class BaseDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> implements IJobDataSink<OT> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
private final String myInstanceId; private final String myInstanceId;
private final JobWorkCursor<PT,IT,OT> myJobWorkCursor; private final JobWorkCursor<PT, IT, OT> myJobWorkCursor;
private int myRecoveredErrorCount; private int myRecoveredErrorCount;
protected final String myJobDefinitionId; protected final String myJobDefinitionId;
private IWarningProcessor myWarningProcessor;
protected BaseDataSink(String theInstanceId, protected BaseDataSink(String theInstanceId,
JobWorkCursor<PT,IT,OT> theJobWorkCursor) { JobWorkCursor<PT, IT, OT> theJobWorkCursor) {
myInstanceId = theInstanceId; myInstanceId = theInstanceId;
myJobWorkCursor = theJobWorkCursor; myJobWorkCursor = theJobWorkCursor;
myJobDefinitionId = theJobWorkCursor.getJobDefinition().getJobDefinitionId(); myJobDefinitionId = theJobWorkCursor.getJobDefinition().getJobDefinitionId();
@ -48,13 +50,27 @@ abstract class BaseDataSink<PT extends IModelJson, IT extends IModelJson, OT ext
@Override @Override
public void recoveredError(String theMessage) { public void recoveredError(String theMessage) {
ourLog.error("Error during job[{}] step[{}]: {}", myInstanceId, myJobWorkCursor.getCurrentStepId(), theMessage); ourLog.error("Error during job[{}] step[{}]: {}", myInstanceId, myJobWorkCursor.getCurrentStepId(), theMessage);
if (myWarningProcessor != null) {
myWarningProcessor.recoverWarningMessage(theMessage);
}
myRecoveredErrorCount++; myRecoveredErrorCount++;
} }
public void setWarningProcessor(IWarningProcessor theWarningProcessor) {
myWarningProcessor = theWarningProcessor;
}
public int getRecoveredErrorCount() { public int getRecoveredErrorCount() {
return myRecoveredErrorCount; return myRecoveredErrorCount;
} }
public String getRecoveredWarning() {
if (myWarningProcessor != null) {
return myWarningProcessor.getRecoveredWarningMessage();
}
return null;
}
public abstract int getWorkChunkCount(); public abstract int getWorkChunkCount();
public boolean firstStepProducedNothing() { public boolean firstStepProducedNothing() {
@ -65,7 +81,7 @@ abstract class BaseDataSink<PT extends IModelJson, IT extends IModelJson, OT ext
return getWorkChunkCount() == 1; return getWorkChunkCount() == 1;
} }
public JobDefinitionStep<PT,IT,OT> getTargetStep() { public JobDefinitionStep<PT, IT, OT> getTargetStep() {
return myJobWorkCursor.currentStep; return myJobWorkCursor.currentStep;
} }

View File

@ -91,8 +91,8 @@ public class StepExecutor {
if (theStepExecutionDetails.hasAssociatedWorkChunk()) { if (theStepExecutionDetails.hasAssociatedWorkChunk()) {
int recordsProcessed = outcome.getRecordsProcessed(); int recordsProcessed = outcome.getRecordsProcessed();
int recoveredErrorCount = theDataSink.getRecoveredErrorCount(); int recoveredErrorCount = theDataSink.getRecoveredErrorCount();
WorkChunkCompletionEvent event = new WorkChunkCompletionEvent(chunkId, recordsProcessed, recoveredErrorCount, theDataSink.getRecoveredWarning());
WorkChunkCompletionEvent event = new WorkChunkCompletionEvent(chunkId, recordsProcessed, recoveredErrorCount);
myJobPersistence.onWorkChunkCompletion(event); myJobPersistence.onWorkChunkCompletion(event);
} }

View File

@ -105,9 +105,10 @@ public class JobInstance implements IModelJson, IJobInstance {
private int myErrorCount; private int myErrorCount;
@JsonProperty(value = "estimatedCompletion", access = JsonProperty.Access.READ_ONLY) @JsonProperty(value = "estimatedCompletion", access = JsonProperty.Access.READ_ONLY)
private String myEstimatedTimeRemaining; private String myEstimatedTimeRemaining;
@JsonProperty(value = "report", access = JsonProperty.Access.READ_WRITE) @JsonProperty(value = "report", access = JsonProperty.Access.READ_WRITE)
private String myReport; private String myReport;
@JsonProperty(value = "warningMessages", access = JsonProperty.Access.READ_ONLY)
private String myWarningMessages;
/** /**
* Constructor * Constructor
@ -141,6 +142,7 @@ public class JobInstance implements IModelJson, IJobInstance {
setWorkChunksPurged(theJobInstance.isWorkChunksPurged()); setWorkChunksPurged(theJobInstance.isWorkChunksPurged());
setCurrentGatedStepId(theJobInstance.getCurrentGatedStepId()); setCurrentGatedStepId(theJobInstance.getCurrentGatedStepId());
setReport(theJobInstance.getReport()); setReport(theJobInstance.getReport());
setWarningMessages(theJobInstance.getWarningMessages());
} }
@ -336,6 +338,14 @@ public class JobInstance implements IModelJson, IJobInstance {
return this; return this;
} }
public String getWarningMessages() {
return myWarningMessages;
}
public JobInstance setWarningMessages(String theWarningMessages) {
myWarningMessages = theWarningMessages;
return this;
}
public void setJobDefinition(JobDefinition<?> theJobDefinition) { public void setJobDefinition(JobDefinition<?> theJobDefinition) {
setJobDefinitionId(theJobDefinition.getJobDefinitionId()); setJobDefinitionId(theJobDefinition.getJobDefinitionId());
setJobDefinitionVersion(theJobDefinition.getJobDefinitionVersion()); setJobDefinitionVersion(theJobDefinition.getJobDefinitionVersion());
@ -379,6 +389,7 @@ public class JobInstance implements IModelJson, IJobInstance {
.append("errorCount", myErrorCount) .append("errorCount", myErrorCount)
.append("estimatedTimeRemaining", myEstimatedTimeRemaining) .append("estimatedTimeRemaining", myEstimatedTimeRemaining)
.append("report", myReport) .append("report", myReport)
.append("warningMessages", myWarningMessages)
.toString(); .toString();
} }

View File

@ -94,6 +94,9 @@ public class WorkChunk implements IModelJson {
@JsonProperty(value = "errorCount", access = JsonProperty.Access.READ_ONLY) @JsonProperty(value = "errorCount", access = JsonProperty.Access.READ_ONLY)
private int myErrorCount; private int myErrorCount;
@JsonProperty(value = "warningMessage", access = JsonProperty.Access.READ_ONLY)
private String myWarningMessage;
/** /**
* Constructor * Constructor
*/ */
@ -246,6 +249,15 @@ public class WorkChunk implements IModelJson {
myUpdateTime = theUpdateTime; myUpdateTime = theUpdateTime;
} }
public String getWarningMessage() {
return myWarningMessage;
}
public WorkChunk setWarningMessage(String theWarningMessage) {
myWarningMessage = theWarningMessage;
return this;
}
@Override @Override
public String toString() { public String toString() {
ToStringBuilder b = new ToStringBuilder(this); ToStringBuilder b = new ToStringBuilder(this);
@ -268,6 +280,9 @@ public class WorkChunk implements IModelJson {
if (myErrorCount > 0) { if (myErrorCount > 0) {
b.append("ErrorCount", myErrorCount); b.append("ErrorCount", myErrorCount);
} }
if (isNotBlank(myWarningMessage)) {
b.append("WarningMessage", myWarningMessage);
}
return b.toString(); return b.toString();
} }
} }

View File

@ -28,6 +28,7 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
public class WorkChunkCompletionEvent extends BaseWorkChunkEvent { public class WorkChunkCompletionEvent extends BaseWorkChunkEvent {
int myRecordsProcessed; int myRecordsProcessed;
int myRecoveredErrorCount; int myRecoveredErrorCount;
String myRecoveredWarningMessage;
public WorkChunkCompletionEvent(String theChunkId, int theRecordsProcessed, int theRecoveredErrorCount) { public WorkChunkCompletionEvent(String theChunkId, int theRecordsProcessed, int theRecoveredErrorCount) {
super(theChunkId); super(theChunkId);
@ -35,6 +36,11 @@ public class WorkChunkCompletionEvent extends BaseWorkChunkEvent {
myRecoveredErrorCount = theRecoveredErrorCount; myRecoveredErrorCount = theRecoveredErrorCount;
} }
public WorkChunkCompletionEvent(String theChunkId, int theRecordsProcessed, int theRecoveredErrorCount, String theRecoveredWarningMessage) {
this(theChunkId, theRecordsProcessed, theRecoveredErrorCount);
myRecoveredWarningMessage = theRecoveredWarningMessage;
}
public int getRecordsProcessed() { public int getRecordsProcessed() {
return myRecordsProcessed; return myRecordsProcessed;
} }
@ -43,6 +49,10 @@ public class WorkChunkCompletionEvent extends BaseWorkChunkEvent {
return myRecoveredErrorCount; return myRecoveredErrorCount;
} }
public String getRecoveredWarningMessage() {
return myRecoveredWarningMessage;
}
@Override @Override
public boolean equals(Object theO) { public boolean equals(Object theO) {
if (this == theO) return true; if (this == theO) return true;
@ -51,11 +61,11 @@ public class WorkChunkCompletionEvent extends BaseWorkChunkEvent {
WorkChunkCompletionEvent that = (WorkChunkCompletionEvent) theO; WorkChunkCompletionEvent that = (WorkChunkCompletionEvent) theO;
return new EqualsBuilder().appendSuper(super.equals(theO)).append(myRecordsProcessed, that.myRecordsProcessed).append(myRecoveredErrorCount, that.myRecoveredErrorCount).isEquals(); return new EqualsBuilder().appendSuper(super.equals(theO)).append(myRecordsProcessed, that.myRecordsProcessed).append(myRecoveredErrorCount, that.myRecoveredErrorCount).append(myRecoveredWarningMessage, that.myRecoveredWarningMessage).isEquals();
} }
@Override @Override
public int hashCode() { public int hashCode() {
return new HashCodeBuilder(17, 37).appendSuper(super.hashCode()).append(myRecordsProcessed).append(myRecoveredErrorCount).toHashCode(); return new HashCodeBuilder(17, 37).appendSuper(super.hashCode()).append(myRecordsProcessed).append(myRecoveredErrorCount).append(myRecoveredWarningMessage).toHashCode();
} }
} }

View File

@ -30,7 +30,9 @@ import org.slf4j.Logger;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class InstanceProgress { public class InstanceProgress {
@ -50,10 +52,13 @@ public class InstanceProgress {
private String myErrormessage = null; private String myErrormessage = null;
private StatusEnum myNewStatus = null; private StatusEnum myNewStatus = null;
private final Map<String, Map<WorkChunkStatusEnum, Integer>> myStepToStatusCountMap = new HashMap<>(); private final Map<String, Map<WorkChunkStatusEnum, Integer>> myStepToStatusCountMap = new HashMap<>();
private final Set<String> myWarningMessages = new HashSet<>();
public void addChunk(WorkChunk theChunk) { public void addChunk(WorkChunk theChunk) {
myErrorCountForAllStatuses += theChunk.getErrorCount(); myErrorCountForAllStatuses += theChunk.getErrorCount();
if (theChunk.getWarningMessage() != null) {
myWarningMessages.add(theChunk.getWarningMessage());
}
updateRecordsProcessed(theChunk); updateRecordsProcessed(theChunk);
updateEarliestTime(theChunk); updateEarliestTime(theChunk);
updateLatestEndTime(theChunk); updateLatestEndTime(theChunk);
@ -119,6 +124,9 @@ public class InstanceProgress {
public void updateInstance(JobInstance theInstance) { public void updateInstance(JobInstance theInstance) {
updateInstance(theInstance, false); updateInstance(theInstance, false);
String newWarningMessage = String.join("\n", myWarningMessages);
theInstance.setWarningMessages(newWarningMessage);
} }
/** /**