Merge branch 'rel_6_4' into nd-4123-bulk-export-stuck-in-finalize-state

This commit is contained in:
nathaniel.doef 2023-02-03 23:31:37 -05:00
commit 87df691fc2
43 changed files with 610 additions and 155 deletions

View File

@ -0,0 +1,3 @@
type: fix
issue: 4486
title: "Previously, some MDM links of type `POSSIBLE_MATCH` were saved with unnormalized score values. This has been fixed."

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 4491
title: "Batch2 Jobs in the FINALIZE state can now be
cancelled."

View File

@ -0,0 +1,8 @@
---
type: fix
issue: 4491
title: "Moved batch2 reduction step logic to the messaging queue.
Before it was executed during the maintenance run directly.
This resulted in bugs with multiple reduction steps kicking
off for long running jobs.
"

View File

@ -150,6 +150,9 @@ public class IdHelperService implements IIdHelperService<JpaPid> {
assert myDontCheckActiveTransactionForUnitTest || TransactionSynchronizationManager.isSynchronizationActive();
assert theRequestPartitionId != null;
if (theResourceId.contains("/")) {
theResourceId = theResourceId.substring(theResourceId.indexOf("/") + 1);
}
IdDt id = new IdDt(theResourceType, theResourceId);
Map<String, List<IResourceLookup<JpaPid>>> matches = translateForcedIdToPids(theRequestPartitionId,
Collections.singletonList(id),

View File

@ -73,11 +73,9 @@ public class MdmLinkDaoSvc<P extends IResourcePersistentId, M extends IMdmLink<P
mdmLink.setEidMatch(theMatchOutcome.isEidMatch() | mdmLink.isEidMatchPresent());
mdmLink.setHadToCreateNewGoldenResource(theMatchOutcome.isCreatedNewResource() | mdmLink.getHadToCreateNewGoldenResource());
mdmLink.setMdmSourceType(myFhirContext.getResourceType(theSourceResource));
if (mdmLink.getScore() != null) {
mdmLink.setScore(Math.max(theMatchOutcome.score, mdmLink.getScore()));
} else {
mdmLink.setScore(theMatchOutcome.score);
}
setScoreProperties(theMatchOutcome, mdmLink);
// Add partition for the mdm link if it's available in the source resource
RequestPartitionId partitionId = (RequestPartitionId) theSourceResource.getUserData(Constants.RESOURCE_PARTITION_ID);
if (partitionId != null && partitionId.getFirstPartitionIdOrNull() != null) {
@ -91,6 +89,24 @@ public class MdmLinkDaoSvc<P extends IResourcePersistentId, M extends IMdmLink<P
return mdmLink;
}
private void setScoreProperties(MdmMatchOutcome theMatchOutcome, M mdmLink) {
if (theMatchOutcome.getScore() != null) {
mdmLink.setScore( mdmLink.getScore() != null
? Math.max(theMatchOutcome.getNormalizedScore(), mdmLink.getScore())
: theMatchOutcome.getNormalizedScore() );
}
if (theMatchOutcome.getVector() != null) {
mdmLink.setVector( mdmLink.getVector() != null
? Math.max(theMatchOutcome.getVector(), mdmLink.getVector())
: theMatchOutcome.getVector() );
}
mdmLink.setRuleCount( mdmLink.getRuleCount() != null
? Math.max(theMatchOutcome.getMdmRuleCount(), mdmLink.getRuleCount())
: theMatchOutcome.getMdmRuleCount() );
}
@Nonnull
public M getOrCreateMdmLinkByGoldenResourceAndSourceResource(
IAnyResource theGoldenResource, IAnyResource theSourceResource
@ -127,7 +143,6 @@ public class MdmLinkDaoSvc<P extends IResourcePersistentId, M extends IMdmLink<P
* @param theSourceResourcePid The ResourcepersistenceId of the Source resource
* @return The {@link IMdmLink} entity that matches these criteria if exists
*/
@SuppressWarnings("unchecked")
public Optional<M> getLinkByGoldenResourcePidAndSourceResourcePid(P theGoldenResourcePid, P theSourceResourcePid) {
if (theSourceResourcePid == null || theGoldenResourcePid == null) {
return Optional.empty();

View File

@ -94,7 +94,7 @@ public class MdmMatchLinkSvc {
private void handleMdmWithMultipleCandidates(IAnyResource theResource, CandidateList theCandidateList, MdmTransactionContext theMdmTransactionContext) {
MatchedGoldenResourceCandidate firstMatch = theCandidateList.getFirstMatch();
IResourcePersistentId sampleGoldenResourcePid = firstMatch.getCandidateGoldenResourcePid();
IResourcePersistentId<?> sampleGoldenResourcePid = firstMatch.getCandidateGoldenResourcePid();
boolean allSameGoldenResource = theCandidateList.stream()
.allMatch(candidate -> candidate.getCandidateGoldenResourcePid().equals(sampleGoldenResourcePid));
@ -105,17 +105,7 @@ public class MdmMatchLinkSvc {
log(theMdmTransactionContext, "MDM received multiple match candidates, that were linked to different Golden Resources. Setting POSSIBLE_DUPLICATES and POSSIBLE_MATCHES.");
//Set them all as POSSIBLE_MATCH
List<IAnyResource> goldenResources = new ArrayList<>();
for (MatchedGoldenResourceCandidate matchedGoldenResourceCandidate : theCandidateList.getCandidates()) {
IAnyResource goldenResource = myMdmGoldenResourceFindingSvc
.getGoldenResourceFromMatchedGoldenResourceCandidate(matchedGoldenResourceCandidate, theMdmTransactionContext.getResourceType());
MdmMatchOutcome outcome = new MdmMatchOutcome(matchedGoldenResourceCandidate.getMatchResult().vector,
matchedGoldenResourceCandidate.getMatchResult().getNormalizedScore());
outcome.setMatchResultEnum(MdmMatchResultEnum.POSSIBLE_MATCH);
outcome.setEidMatch(theCandidateList.isEidMatch());
myMdmLinkSvc.updateLink(goldenResource, theResource, outcome, MdmLinkSourceEnum.AUTO, theMdmTransactionContext);
goldenResources.add(goldenResource);
}
List<IAnyResource> goldenResources = createPossibleMatches(theResource, theCandidateList, theMdmTransactionContext);
//Set all GoldenResources as POSSIBLE_DUPLICATE of the last GoldenResource.
IAnyResource firstGoldenResource = goldenResources.get(0);
@ -129,6 +119,26 @@ public class MdmMatchLinkSvc {
}
}
private List<IAnyResource> createPossibleMatches(IAnyResource theResource, CandidateList theCandidateList, MdmTransactionContext theMdmTransactionContext) {
List<IAnyResource> goldenResources = new ArrayList<>();
for (MatchedGoldenResourceCandidate matchedGoldenResourceCandidate : theCandidateList.getCandidates()) {
IAnyResource goldenResource = myMdmGoldenResourceFindingSvc
.getGoldenResourceFromMatchedGoldenResourceCandidate(matchedGoldenResourceCandidate, theMdmTransactionContext.getResourceType());
MdmMatchOutcome outcome = new MdmMatchOutcome(matchedGoldenResourceCandidate.getMatchResult().getVector(),
matchedGoldenResourceCandidate.getMatchResult().getScore())
.setMdmRuleCount( matchedGoldenResourceCandidate.getMatchResult().getMdmRuleCount());
outcome.setMatchResultEnum(MdmMatchResultEnum.POSSIBLE_MATCH);
outcome.setEidMatch(theCandidateList.isEidMatch());
myMdmLinkSvc.updateLink(goldenResource, theResource, outcome, MdmLinkSourceEnum.AUTO, theMdmTransactionContext);
goldenResources.add(goldenResource);
}
return goldenResources;
}
private void handleMdmWithNoCandidates(IAnyResource theResource, MdmTransactionContext theMdmTransactionContext) {
log(theMdmTransactionContext, String.format("There were no matched candidates for MDM, creating a new %s Golden Resource.", theResource.getIdElement().getResourceType()));
IAnyResource newGoldenResource = myGoldenResourceHelper.createGoldenResourceFromMdmSourceResource(theResource, theMdmTransactionContext);

View File

@ -100,7 +100,6 @@ public class MdmLinkDaoSvcTest extends BaseMdmR4Test {
mdmLink.setUpdated(new Date());
mdmLink.setGoldenResourcePersistenceId(JpaPid.fromId(thePatientPid));
mdmLink.setSourcePersistenceId(runInTransaction(()->myIdHelperService.getPidOrNull(RequestPartitionId.allPartitions(), patient)));
MdmLink saved= myMdmLinkDao.save(mdmLink);
return saved;
return myMdmLinkDao.save(mdmLink);
}
}

View File

@ -10,6 +10,7 @@ import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.test.utilities.RangeTestHelper;
import ca.uhn.fhir.util.ParametersUtil;
import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.StringUtils;
@ -377,7 +378,7 @@ public class MdmProviderQueryLinkR4Test extends BaseLinkR4Test {
List<Parameters.ParametersParameterComponent> list = getParametersByName(result, "link");
assertThat(list, hasSize(4));
List<Parameters.ParametersParameterComponent> part = list.get(3).getPart();
assertMdmLink(MDM_LINK_PROPERTY_COUNT, part, goldenResourceId.getValue(), patientId.getValue(), MdmMatchResultEnum.MATCH, "false", "false", "2");
assertMdmLink(MDM_LINK_PROPERTY_COUNT, part, goldenResourceId.getValue(), patientId.getValue(), MdmMatchResultEnum.MATCH, "false", "false", ".666");
}
@Test
@ -459,7 +460,7 @@ public class MdmProviderQueryLinkR4Test extends BaseLinkR4Test {
assertThat(thePart.get(5).getValue().primitiveValue(), is(theNewGoldenResource));
assertThat(thePart.get(6).getName(), is("score"));
assertThat(thePart.get(6).getValue().primitiveValue(), is(theScore));
RangeTestHelper.checkInRange(theScore, thePart.get(6).getValue().primitiveValue());
}
}

View File

@ -16,6 +16,7 @@ import java.util.List;
import static ca.uhn.fhir.mdm.api.MdmMatchResultEnum.MATCH;
import static ca.uhn.fhir.mdm.api.MdmMatchResultEnum.NO_MATCH;
import static ca.uhn.fhir.mdm.api.MdmMatchResultEnum.POSSIBLE_MATCH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@ -52,6 +53,23 @@ class MdmLinkUpdaterSvcImplTest extends BaseMdmR4Test {
assertLinksMatchedByEid(false, false);
}
@Test
public void testUpdateLinkPossibleMatchSavesNormalizedScore() {
final Patient goldenPatient = createGoldenPatient(buildJanePatient());
final Patient patient1 = createPatient(buildJanePatient());
buildUpdateLinkMdmTransactionContext();
MdmMatchOutcome matchOutcome = new MdmMatchOutcome(61L, 5.0).setMdmRuleCount(6).setMatchResultEnum(POSSIBLE_MATCH);
myMdmLinkDaoSvc.createOrUpdateLinkEntity(goldenPatient, patient1, matchOutcome, MdmLinkSourceEnum.MANUAL, createContextForCreate("Patient"));
final List<MdmLink> targets = myMdmLinkDaoSvc.findMdmLinksByGoldenResource(goldenPatient);
assertFalse(targets.isEmpty());
assertEquals(1, targets.size());
final MdmLink mdmLink = targets.get(0);
assertEquals(matchOutcome.getNormalizedScore(), mdmLink.getScore());
}
@Test
public void testUpdateLinkMatchAfterVersionChange() {
myMdmSettings.getMdmRules().setVersion("1");

View File

@ -95,6 +95,7 @@ public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test {
myPort = myServer.getPort();
myServerBase = myServer.getBaseUrl();
myClient = myServer.getFhirClient();
myClient.setEncoding(EncodingEnum.JSON);
myRestServer = myServer.getRestfulServer();
myClient.getInterceptorService().unregisterInterceptorsIf(t -> t instanceof LoggingInterceptor);

View File

@ -430,7 +430,7 @@ public class ResourceProviderDstu3Test extends BaseResourceProviderDstu3Test {
String respString = myClient.transaction().withBundle(input).prettyPrint().execute();
ourLog.debug(respString);
Bundle bundle = myFhirContext.newXmlParser().parseResource(Bundle.class, respString);
Bundle bundle = myFhirContext.newJsonParser().parseResource(Bundle.class, respString);
IdType id = new IdType(bundle.getEntry().get(0).getResponse().getLocation());
Basic basic = myClient.read().resource(Basic.class).withId(id).execute();
@ -1098,7 +1098,7 @@ public class ResourceProviderDstu3Test extends BaseResourceProviderDstu3Test {
//@formatter:on
fail();
} catch (PreconditionFailedException e) {
assertEquals("HTTP 412 Precondition Failed: " + Msg.code(962) + "Failed to DELETE resource with match URL \"Patient?identifier=testDeleteConditionalMultiple\" because this search matched 2 resources",
assertEquals("HTTP 412 Precondition Failed: " + Msg.code(962) + "Failed to DELETE resource with match URL \"Patient?identifier=testDeleteConditionalMultiple&_format=json\" because this search matched 2 resources",
e.getMessage());
}

View File

@ -34,6 +34,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -45,11 +47,13 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static ca.uhn.fhir.batch2.config.BaseBatch2Config.CHANNEL_NAME;
import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@ -216,9 +220,44 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myLastStepLatch.awaitExpected();
}
@Test
public void testJobDefinitionWithReductionStepIT() throws InterruptedException {
private void createThreeStepReductionJob(
String theJobId,
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> theFirstStep,
IJobStepWorker<TestJobParameters, FirstStepOutput, SecondStepOutput> theSecondStep,
IReductionStepWorker<TestJobParameters, SecondStepOutput, ReductionStepOutput> theReductionsStep
) {
// create job definition (it's the test method's name)
JobDefinition<? extends IModelJson> jd = JobDefinition.newBuilder()
.setJobDefinitionId(theJobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
FIRST_STEP_ID,
"Test first step",
FirstStepOutput.class,
theFirstStep
)
.addIntermediateStep("SECOND",
"Second step",
SecondStepOutput.class,
theSecondStep)
.addFinalReducerStep(
LAST_STEP_ID,
"Test last step",
ReductionStepOutput.class,
theReductionsStep
)
.build();
myJobDefinitionRegistry.addJobDefinition(jd);
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testJobDefinitionWithReductionStepIT(boolean theDelayReductionStepBool) throws InterruptedException {
// setup
String jobId = new Exception().getStackTrace()[0].getMethodName() + "_" + theDelayReductionStepBool;
String testInfo = "test";
AtomicInteger secondStepInt = new AtomicInteger();
@ -235,6 +274,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
SecondStepOutput output = new SecondStepOutput();
output.setValue(testInfo + secondStepInt.getAndIncrement());
sink.accept(output);
return RunOutcome.SUCCESS;
};
@ -243,63 +283,66 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
private final ArrayList<SecondStepOutput> myOutput = new ArrayList<>();
private final AtomicBoolean myBoolean = new AtomicBoolean();
private final AtomicInteger mySecondGate = new AtomicInteger();
@Override
public ChunkOutcome consume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails) {
myOutput.add(theChunkDetails.getData());
// 1 because we know 2 packets are coming.
// we'll fire the second maintenance run on the second packet
// which should cause multiple maintenance runs to run simultaneously
if (theDelayReductionStepBool && mySecondGate.getAndIncrement() == 1) {
ourLog.info("SECOND FORCED MAINTENANCE PASS FORCED");
myBatch2JobHelper.forceRunMaintenancePass();
}
return ChunkOutcome.SUCCESS();
}
@Nonnull
@Override
public RunOutcome run(@Nonnull StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails,
@Nonnull IJobDataSink<ReductionStepOutput> theDataSink) throws JobExecutionFailedException {
theDataSink.accept(new ReductionStepOutput(myOutput));
callLatch(myLastStepLatch, theStepExecutionDetails);
public RunOutcome run(
@Nonnull StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails,
@Nonnull IJobDataSink<ReductionStepOutput> theDataSink
) throws JobExecutionFailedException {
boolean isRunAlready = myBoolean.getAndSet(true);
assertFalse(isRunAlready, "Reduction step should only be called once!");
complete(theStepExecutionDetails, theDataSink);
return RunOutcome.SUCCESS;
}
};
// create job definition
String jobId = new Exception().getStackTrace()[0].getMethodName();
JobDefinition<? extends IModelJson> jd = JobDefinition.newBuilder()
.setJobDefinitionId(jobId)
.setJobDescription("test job")
.setJobDefinitionVersion(TEST_JOB_VERSION)
.setParametersType(TestJobParameters.class)
.gatedExecution()
.addFirstStep(
FIRST_STEP_ID,
"Test first step",
FirstStepOutput.class,
first
)
.addIntermediateStep("SECOND",
"Second step",
SecondStepOutput.class,
second)
.addFinalReducerStep(
LAST_STEP_ID,
"Test last step",
ReductionStepOutput.class,
last
)
.build();
myJobDefinitionRegistry.addJobDefinition(jd);
private void complete(
@Nonnull StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails,
@Nonnull IJobDataSink<ReductionStepOutput> theDataSink
) {
assertTrue(myBoolean.get());
theDataSink.accept(new ReductionStepOutput(myOutput));
callLatch(myLastStepLatch, theStepExecutionDetails);
}
};
createThreeStepReductionJob(jobId, first, second, last);
// run test
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
String instanceId = startResponse.getJobId();
myFirstStepLatch.awaitExpected();
assertNotNull(instanceId);
myBatch2JobHelper.awaitGatedStepId(FIRST_STEP_ID, instanceId);
// wait for last step to finish
ourLog.info("Setting last step latch");
myLastStepLatch.setExpectedCount(1);
// waiting
myBatch2JobHelper.awaitJobCompletion(instanceId);
myLastStepLatch.awaitExpected();
ourLog.info("awaited the last step");
// verify
Optional<JobInstance> instanceOp = myJobPersistence.fetchInstance(instanceId);

View File

@ -47,8 +47,8 @@ public class DiffProviderR4Test extends BaseResourceProviderR4Test {
Assertions.assertEquals("replace", FhirPatchApplyR4Test.extractPartValuePrimitive(diff, 0, "operation", "type"));
Assertions.assertEquals("Patient.text.div", FhirPatchApplyR4Test.extractPartValuePrimitive(diff, 0, "operation", "path"));
Assertions.assertEquals("<div xmlns=\"http://www.w3.org/1999/xhtml\"><table class=\"hapiPropertyTable\"><tbody/></table></div>", FhirPatchApplyR4Test.extractPartValuePrimitive(diff, 0, "operation", "previousValue"));
Assertions.assertEquals("<div xmlns=\"http://www.w3.org/1999/xhtml\"><div class=\"hapiHeaderText\"><b>SMITH </b></div><table class=\"hapiPropertyTable\"><tbody/></table></div>", FhirPatchApplyR4Test.extractPartValuePrimitive(diff, 0, "operation", "value"));
Assertions.assertEquals("<div xmlns=\"http://www.w3.org/1999/xhtml\"><table class=\"hapiPropertyTable\"><tbody></tbody></table></div>", FhirPatchApplyR4Test.extractPartValuePrimitive(diff, 0, "operation", "previousValue"));
Assertions.assertEquals("<div xmlns=\"http://www.w3.org/1999/xhtml\"><div class=\"hapiHeaderText\"><b>SMITH </b></div><table class=\"hapiPropertyTable\"><tbody></tbody></table></div>", FhirPatchApplyR4Test.extractPartValuePrimitive(diff, 0, "operation", "value"));
Assertions.assertEquals("insert", FhirPatchApplyR4Test.extractPartValuePrimitive(diff, 1, "operation", "type"));
Assertions.assertEquals("Patient.name", FhirPatchApplyR4Test.extractPartValuePrimitive(diff, 1, "operation", "path"));
@ -86,8 +86,8 @@ public class DiffProviderR4Test extends BaseResourceProviderR4Test {
Assertions.assertEquals("replace", FhirPatchApplyR4Test.extractPartValuePrimitive(diff, 2, "operation", "type"));
Assertions.assertEquals("Patient.text.div", FhirPatchApplyR4Test.extractPartValuePrimitive(diff, 2, "operation", "path"));
Assertions.assertEquals("<div xmlns=\"http://www.w3.org/1999/xhtml\"><table class=\"hapiPropertyTable\"><tbody/></table></div>", FhirPatchApplyR4Test.extractPartValuePrimitive(diff, 2, "operation", "previousValue"));
Assertions.assertEquals("<div xmlns=\"http://www.w3.org/1999/xhtml\"><div class=\"hapiHeaderText\"><b>SMITH </b></div><table class=\"hapiPropertyTable\"><tbody/></table></div>", FhirPatchApplyR4Test.extractPartValuePrimitive(diff, 2, "operation", "value"));
Assertions.assertEquals("<div xmlns=\"http://www.w3.org/1999/xhtml\"><table class=\"hapiPropertyTable\"><tbody></tbody></table></div>", FhirPatchApplyR4Test.extractPartValuePrimitive(diff, 2, "operation", "previousValue"));
Assertions.assertEquals("<div xmlns=\"http://www.w3.org/1999/xhtml\"><div class=\"hapiHeaderText\"><b>SMITH </b></div><table class=\"hapiPropertyTable\"><tbody></tbody></table></div>", FhirPatchApplyR4Test.extractPartValuePrimitive(diff, 2, "operation", "value"));
Assertions.assertEquals("insert", FhirPatchApplyR4Test.extractPartValuePrimitive(diff, 3, "operation", "type"));
Assertions.assertEquals("Patient.name", FhirPatchApplyR4Test.extractPartValuePrimitive(diff, 3, "operation", "path"));

View File

@ -764,6 +764,8 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
public void testUpdateResourceAfterReadOperationAndNoChangesShouldNotChangeVersion(){
// Create Patient
Patient patient = new Patient();
patient.getText().setDivAsString("<div xmlns=\"http://www.w3.org/1999/xhtml\">hello</div>");
patient = (Patient) myClient.create().resource(patient).execute().getResource();
assertEquals(1, patient.getIdElement().getVersionIdPartAsLong());

View File

@ -477,6 +477,7 @@ public class TerminologySvcImplR4Test extends BaseTermR4Test {
myCodeSystemDao.update(codeSystem, mySrd);
await().until(() -> {
myBatch2JobHelper.runMaintenancePass();
myTerminologyDeferredStorageSvc.saveAllDeferred();
return myTerminologyDeferredStorageSvc.isStorageQueueEmpty(true);
}, equalTo(true));

View File

@ -227,19 +227,12 @@ public class ResourceProviderR5Test extends BaseResourceProviderR5Test {
try (CloseableHttpResponse resp = ourHttpClient.execute(post)) {
String respString = IOUtils.toString(resp.getEntity().getContent(), Charsets.UTF_8);
ourLog.debug(respString);
// assertEquals(200, resp.getStatusLine().getStatusCode());
assertEquals(200, resp.getStatusLine().getStatusCode());
// As of 2023-01-26, the above line was restored.
// As of 2021-12-28, the R5 structures return a version string that isn't
// actually in the fhirVersion ValueSet. If this stops being the case this
// test will fail and the line above should be restored
OperationOutcome oo = myFhirCtx.newJsonParser().parseResource(OperationOutcome.class, respString);
assertEquals(1, oo.getIssue().size());
// assertThat(oo.getIssue().get(0).getDiagnostics(), containsString("is not in the value set 'FHIRVersion'"));
//As of 2022-10-06, the error is now that RequestGroup is not in the resourcetypes valueset, (though it is).
//TODO JA: I'm not sure if i have to update this valueset somewhere? the linked valueset _does_ contain the resource type.
assertThat(oo.getIssue().get(0).getDiagnostics(), containsString("is not in the value set 'Resource Types'"));
}
}

View File

@ -71,7 +71,6 @@ public class Batch2JobHelper {
return awaitJobHasStatusWithoutMaintenancePass(theBatchJobId, StatusEnum.COMPLETED);
}
public JobInstance awaitJobCancelled(String theBatchJobId) {
return awaitJobHasStatus(theBatchJobId, StatusEnum.CANCELLED);
}
@ -106,7 +105,6 @@ public class Batch2JobHelper {
return myJobCoordinator.getInstance(theBatchJobId);
}
public JobInstance awaitJobawaitJobHasStatusWithoutMaintenancePass(String theBatchJobId, int theSecondsToWait, StatusEnum... theExpectedStatus) {
assert !TransactionSynchronizationManager.isActualTransactionActive();
@ -168,7 +166,6 @@ public class Batch2JobHelper {
public long getCombinedRecordsProcessed(String theJobId) {
JobInstance job = myJobCoordinator.getInstance(theJobId);
return job.getCombinedRecordsProcessed();
}
public void awaitAllJobsOfJobDefinitionIdToComplete(String theJobDefinitionId) {
@ -243,6 +240,14 @@ public class Batch2JobHelper {
myJobMaintenanceService.runMaintenancePass();
}
/**
* Forces a run of the maintenance pass without waiting for
* the semaphore to release
*/
public void forceRunMaintenancePass() {
myJobMaintenanceService.forceMaintenancePass();
}
public void cancelAllJobsAndAwaitCancellation() {
List<JobInstance> instances = myJobPersistence.fetchInstances(1000, 0);
for (JobInstance next : instances) {

View File

@ -36,13 +36,13 @@ public final class MdmMatchOutcome {
/**
* A bitmap that indicates which rules matched
*/
public final Long vector;
private final Long vector;
/**
* The sum of all scores for all rules evaluated. Similarity rules add the similarity score (between 0.0 and 1.0) whereas
* matcher rules add either a 0.0 or 1.0.
*/
public final Double score;
private final Double score;
/**
* Did the MDM match operation result in creating a new golden resource resource?
@ -134,6 +134,10 @@ public final class MdmMatchOutcome {
return this;
}
public Double getScore() { return score; }
public Long getVector() { return vector; }
/**
* Gets normalized score that is in the range from zero to one
*

View File

@ -89,12 +89,12 @@ public class MdmResourceMatcherSvc {
MdmMatchOutcome match(IBaseResource theLeftResource, IBaseResource theRightResource) {
MdmMatchOutcome matchResult = getMatchOutcome(theLeftResource, theRightResource);
MdmMatchResultEnum matchResultEnum = myMdmRulesJson.getMatchResult(matchResult.vector);
MdmMatchResultEnum matchResultEnum = myMdmRulesJson.getMatchResult(matchResult.getVector());
matchResult.setMatchResultEnum(matchResultEnum);
if (ourLog.isDebugEnabled()) {
ourLog.debug("{} {}: {}", matchResult.getMatchResultEnum(), theRightResource.getIdElement().toUnqualifiedVersionless(), matchResult);
if (ourLog.isTraceEnabled()) {
ourLog.trace("Field matcher results:\n{}", myMdmRulesJson.getDetailedFieldMatchResultWithSuccessInformation(matchResult.vector));
ourLog.trace("Field matcher results:\n{}", myMdmRulesJson.getDetailedFieldMatchResultWithSuccessInformation(matchResult.getVector()));
}
}
return matchResult;
@ -133,8 +133,8 @@ public class MdmResourceMatcherSvc {
ourLog.trace("Matcher {} is valid for resource type: {}. Evaluating match.", fieldComparator.getName(), resourceType);
MdmMatchEvaluation matchEvaluation = fieldComparator.match(theLeftResource, theRightResource);
if (matchEvaluation.match) {
vector |= (1 << i);
ourLog.trace("Match: Successfully matched matcher {} with score {}.", fieldComparator.getName(), matchEvaluation.score);
vector |= (1L << i);
ourLog.trace("Match: Successfully matched matcher {} with score {}. New vector: {}", fieldComparator.getName(), matchEvaluation.score, vector);
} else {
ourLog.trace("No match: Matcher {} did not match (score: {}).", fieldComparator.getName(), matchEvaluation.score);
}

View File

@ -43,8 +43,8 @@ public abstract class BaseR4Test {
}
protected void assertMatchResult(MdmMatchResultEnum theExpectedMatchEnum, long theExpectedVector, double theExpectedScore, boolean theExpectedNewGoldenResource, boolean theExpectedEidMatch, MdmMatchOutcome theMatchResult) {
assertEquals(theExpectedScore, theMatchResult.score, 0.001);
assertEquals(theExpectedVector, theMatchResult.vector);
assertEquals(theExpectedScore, theMatchResult.getScore(), 0.001);
assertEquals(theExpectedVector, theMatchResult.getVector());
assertEquals(theExpectedEidMatch, theMatchResult.isEidMatch());
assertEquals(theExpectedNewGoldenResource, theMatchResult.isCreatedNewResource());
assertEquals(theExpectedMatchEnum, theMatchResult.getMatchResultEnum());

View File

@ -0,0 +1,37 @@
package ca.uhn.fhir.batch2.jobs.export;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.jpa.api.svc.IBatch2JobRunner;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
import org.slf4j.Logger;
import static org.slf4j.LoggerFactory.getLogger;
public class BulkExportUtil {
private static final Logger ourLog = getLogger(BulkExportUtil.class);
private BulkExportUtil() {
}
/**
* Converts Batch2 StatusEnum -> BulkExportJobStatusEnum
*/
public static BulkExportJobStatusEnum fromBatchStatus(StatusEnum status) {
switch (status) {
case QUEUED:
case FINALIZE:
return BulkExportJobStatusEnum.SUBMITTED;
case COMPLETED :
return BulkExportJobStatusEnum.COMPLETE;
case IN_PROGRESS:
return BulkExportJobStatusEnum.BUILDING;
default:
ourLog.warn("Unrecognized status {}; treating as FAILED/CANCELLED/ERRORED", status.name());
case FAILED:
case CANCELLED:
case ERRORED:
return BulkExportJobStatusEnum.ERROR;
}
}
}

View File

@ -22,10 +22,10 @@ package ca.uhn.fhir.batch2.jobs.services;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.JobOperationResultJson;
import ca.uhn.fhir.batch2.jobs.export.BulkExportUtil;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportJobParameters;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.model.Batch2JobInfo;
import ca.uhn.fhir.jpa.api.model.Batch2JobOperationResult;
@ -33,7 +33,6 @@ import ca.uhn.fhir.jpa.api.model.BulkExportParameters;
import ca.uhn.fhir.jpa.api.svc.IBatch2JobRunner;
import ca.uhn.fhir.jpa.batch.models.Batch2BaseJobParameters;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
import org.slf4j.Logger;
@ -97,7 +96,10 @@ public class Batch2JobRunnerImpl implements IBatch2JobRunner {
private Batch2JobInfo fromJobInstanceToBatch2JobInfo(@Nonnull JobInstance theInstance) {
Batch2JobInfo info = new Batch2JobInfo();
info.setJobId(theInstance.getInstanceId());
info.setStatus(fromBatchStatus(theInstance.getStatus()));
// should convert this to a more generic enum for all batch2 (which is what it seems like)
// or use the status enum only (combine with bulk export enum)
// on the Batch2JobInfo
info.setStatus(BulkExportUtil.fromBatchStatus(theInstance.getStatus()));
info.setCancelled(theInstance.isCancelled());
info.setStartTime(theInstance.getStartTime());
info.setEndTime(theInstance.getEndTime());
@ -106,22 +108,6 @@ public class Batch2JobRunnerImpl implements IBatch2JobRunner {
return info;
}
public static BulkExportJobStatusEnum fromBatchStatus(StatusEnum status) {
switch (status) {
case QUEUED:
return BulkExportJobStatusEnum.SUBMITTED;
case COMPLETED :
return BulkExportJobStatusEnum.COMPLETE;
case IN_PROGRESS:
return BulkExportJobStatusEnum.BUILDING;
case FAILED:
case CANCELLED:
case ERRORED:
default:
return BulkExportJobStatusEnum.ERROR;
}
}
private Batch2JobStartResponse startBatch2BulkExportJob(BulkExportParameters theParameters) {
JobInstanceStartRequest request = createStartRequest(theParameters);
request.setParameters(BulkExportJobParameters.createFromExportJobParameters(theParameters));

View File

@ -20,6 +20,8 @@ package ca.uhn.fhir.batch2.api;
* #L%
*/
import com.google.common.annotations.VisibleForTesting;
public interface IJobMaintenanceService {
/**
* Do not wait for the next scheduled time for maintenance. Trigger it immediately.
@ -29,4 +31,10 @@ public interface IJobMaintenanceService {
void runMaintenancePass();
/**
* Forces a second maintenance run.
* Only to be used in tests to simulate a long running maintenance step
*/
@VisibleForTesting
void forceMaintenancePass();
}

View File

@ -28,8 +28,8 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import org.slf4j.Logger;
import javax.annotation.Nonnull;
@ -53,7 +53,7 @@ public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT ex
JobStepExecutor(@Nonnull IJobPersistence theJobPersistence,
@Nonnull BatchJobSender theBatchJobSender,
@Nonnull JobInstance theInstance,
@Nonnull WorkChunk theWorkChunk,
WorkChunk theWorkChunk,
@Nonnull JobWorkCursor<PT, IT, OT> theCursor,
@Nonnull WorkChunkProcessor theExecutor, IJobMaintenanceService theJobMaintenanceService) {
myJobPersistence = theJobPersistence;

View File

@ -46,7 +46,7 @@ public class JobStepExecutorFactory {
myJobMaintenanceService = theJobMaintenanceService;
}
public <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> JobStepExecutor<PT,IT,OT> newJobStepExecutor(@Nonnull JobInstance theInstance, @Nonnull WorkChunk theWorkChunk, @Nonnull JobWorkCursor<PT, IT, OT> theCursor) {
public <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> JobStepExecutor<PT,IT,OT> newJobStepExecutor(@Nonnull JobInstance theInstance, WorkChunk theWorkChunk, @Nonnull JobWorkCursor<PT, IT, OT> theCursor) {
return new JobStepExecutor<>(myJobPersistence, myBatchJobSender, theInstance, theWorkChunk, theCursor, myJobStepExecutorSvc, myJobMaintenanceService);
}
}

View File

@ -28,8 +28,8 @@ import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import org.slf4j.Logger;
import java.util.ArrayList;
@ -55,10 +55,14 @@ public class ReductionStepExecutor {
) {
IReductionStepWorker<PT, IT, OT> reductionStepWorker = (IReductionStepWorker<PT, IT, OT>) theStep.getJobStepWorker();
// we mark it first so that no other maintenance passes will pick this job up!
// if we shut down mid process, though, it will be stuck in FINALIZE forever :(
if (!myJobPersistence.markInstanceAsStatus(theInstance.getInstanceId(), StatusEnum.FINALIZE)) {
ourLog.warn("JobInstance[{}] is already in FINALIZE state, no reducer action performed.", theInstance.getInstanceId());
ourLog.warn(
"JobInstance[{}] is already in FINALIZE state. In memory status is {}. Reduction step will not rerun!"
+ " This could be a long running reduction job resulting in the processed msg not being acknowledge,"
+ " or the result of a failed process or server restarting.",
theInstance.getInstanceId(),
theInstance.getStatus().name()
);
return false;
}
theInstance.setStatus(StatusEnum.FINALIZE);
@ -106,6 +110,8 @@ public class ReductionStepExecutor {
break;
case FAIL:
// non-idempotent; but failed chunks will be
// ignored on a second runthrough of reduction step
myJobPersistence.markWorkChunkAsFailed(chunk.getId(),
"Step worker failed to process work chunk " + chunk.getId());
retval = false;

View File

@ -32,8 +32,9 @@ import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.batch2.util.Batch2Constants;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.Logs;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.springframework.messaging.Message;
@ -75,17 +76,28 @@ class WorkChannelMessageHandler implements MessageHandler {
String chunkId = workNotification.getChunkId();
Validate.notNull(chunkId);
Optional<WorkChunk> chunkOpt = myJobPersistence.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId);
if (chunkOpt.isEmpty()) {
ourLog.error("Unable to find chunk with ID {} - Aborting", chunkId);
return;
boolean isReductionWorkNotification = Batch2Constants.REDUCTION_STEP_CHUNK_ID_PLACEHOLDER.equals(chunkId);
JobWorkCursor<?, ?, ?> cursor = null;
WorkChunk workChunk = null;
if (!isReductionWorkNotification) {
Optional<WorkChunk> chunkOpt = myJobPersistence.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId);
if (chunkOpt.isEmpty()) {
ourLog.error("Unable to find chunk with ID {} - Aborting", chunkId);
return;
}
workChunk = chunkOpt.get();
ourLog.debug("Worker picked up chunk. [chunkId={}, stepId={}, startTime={}]", chunkId, workChunk.getTargetStepId(), workChunk.getStartTime());
cursor = buildCursorFromNotification(workNotification);
Validate.isTrue(workChunk.getTargetStepId().equals(cursor.getCurrentStepId()), "Chunk %s has target step %s but expected %s", chunkId, workChunk.getTargetStepId(), cursor.getCurrentStepId());
} else {
ourLog.debug("Processing reduction step work notification. No associated workchunks.");
cursor = buildCursorFromNotification(workNotification);
}
WorkChunk workChunk = chunkOpt.get();
ourLog.debug("Worker picked up chunk. [chunkId={}, stepId={}, startTime={}]", chunkId, workChunk.getTargetStepId(), workChunk.getStartTime());
JobWorkCursor<?, ?, ?> cursor = buildCursorFromNotification(workNotification);
Validate.isTrue(workChunk.getTargetStepId().equals(cursor.getCurrentStepId()), "Chunk %s has target step %s but expected %s", chunkId, workChunk.getTargetStepId(), cursor.getCurrentStepId());
Optional<JobInstance> instanceOpt = myJobPersistence.fetchInstance(workNotification.getInstanceId());
JobInstance instance = instanceOpt.orElseThrow(() -> new InternalErrorException("Unknown instance: " + workNotification.getInstanceId()));

View File

@ -32,13 +32,12 @@ import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import javax.annotation.Nullable;
import java.util.Optional;
import static org.apache.commons.lang3.StringUtils.isBlank;
@ -102,7 +101,7 @@ public class WorkChunkProcessor {
boolean success = myReductionStepExecutor.executeReductionStep(theInstance, step, inputType, parameters);
if (success) {
// Now call call the normal step executor
// Now call the normal step executor
// the data sink stores the report on the instance (i.e. not chunks).
// Assume the OT (report) data is smaller than the list of all IT data
@ -113,7 +112,6 @@ public class WorkChunkProcessor {
}
return new JobStepExecutorOutput<>(success, dataSink);
} else {
// all other kinds of steps
Validate.notNull(theWorkChunk);

View File

@ -35,7 +35,7 @@ import org.slf4j.Logger;
import javax.annotation.Nonnull;
import java.util.Date;
import static ca.uhn.fhir.batch2.config.Batch2Constants.BATCH_START_DATE;
import static ca.uhn.fhir.batch2.util.Batch2Constants.BATCH_START_DATE;
public class GenerateRangeChunksStep<PT extends PartitionedUrlListJobParameters> implements IFirstJobStepWorker<PT, PartitionedUrlChunkRangeJson> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();

View File

@ -22,7 +22,6 @@ package ca.uhn.fhir.batch2.maintenance;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobStepExecutorOutput;
import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
@ -30,11 +29,11 @@ import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.batch2.util.Batch2Constants;
import ca.uhn.fhir.util.Logs;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import java.util.Date;
import java.util.List;
public class JobInstanceProcessor {
@ -125,6 +124,8 @@ public class JobInstanceProcessor {
private void triggerGatedExecutions() {
if (!myInstance.isRunning()) {
ourLog.debug("JobInstance {} is not in a \"running\" state. Status {}",
myInstance.getInstanceId(), myInstance.getStatus().name());
return;
}
@ -136,9 +137,12 @@ public class JobInstanceProcessor {
// final step
if (jobWorkCursor.isFinalStep() && !jobWorkCursor.isReductionStep()) {
ourLog.debug("Job instance {} is in final step and it's not a reducer step", myInstance.getInstanceId());
return;
}
// we should not be sending a second reduction step
// to the queue if it's in finalize status
if (jobWorkCursor.isReductionStep() && myInstance.getStatus() == StatusEnum.FINALIZE) {
ourLog.warn("Job instance {} is still finalizing - a second reduction job will not be started.", myInstance.getInstanceId());
return;
@ -179,16 +183,13 @@ public class JobInstanceProcessor {
myJobPersistence.updateInstance(myInstance);
}
private void processReductionStep(JobWorkCursor<?, ?, ?> jobWorkCursor) {
// do execution of the final step now
// (ie, we won't send to job workers)
JobStepExecutorOutput<?, ?, ?> result = myJobExecutorSvc.doExecution(
JobWorkCursor.fromJobDefinitionAndRequestedStepId(myInstance.getJobDefinition(), jobWorkCursor.nextStep.getStepId()),
private void processReductionStep(JobWorkCursor<?, ?, ?> theWorkCursor) {
JobWorkNotification workNotification = new JobWorkNotification(
myInstance,
null);
if (!result.isSuccessful()) {
myInstance.setEndTime(new Date());
myJobInstanceStatusUpdater.setFailed(myInstance);
}
theWorkCursor.nextStep.getStepId(),
Batch2Constants.REDUCTION_STEP_CHUNK_ID_PLACEHOLDER // chunk id; we don't need it
);
ourLog.debug("Submitting a Work Notification for a job reduction step. No associated work chunk ids are available.");
myBatchJobSender.sendWorkChannelMessage(workNotification);
}
}

View File

@ -179,6 +179,16 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
return myRunMaintenanceSemaphore.getQueueLength();
}
@VisibleForTesting
public void forceMaintenancePass() {
// to simulate a long running job!
ourLog.info(
"Forcing a maintenance pass run; semaphore at {}",
getQueueLength()
);
doMaintenancePass();
}
@Override
public void runMaintenancePass() {
if (!myRunMaintenanceSemaphore.tryAcquire()) {
@ -204,6 +214,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
myJobDefinitionRegistry.setJobDefinition(instance);
JobInstanceProcessor jobInstanceProcessor = new JobInstanceProcessor(myJobPersistence,
myBatchJobSender, instance, progressAccumulator, myJobExecutorSvc);
ourLog.debug("Triggering maintenance process for instance {} in status {}", instance.getInstanceId(), instance.getStatus().name());
jobInstanceProcessor.process();
}
}

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.batch2.api.IJobInstance;
import ca.uhn.fhir.jpa.util.JsonDateDeserializer;
import ca.uhn.fhir.jpa.util.JsonDateSerializer;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
@ -32,7 +33,6 @@ import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import java.util.Date;
import java.util.Objects;
import static org.apache.commons.lang3.StringUtils.isBlank;
@ -359,10 +359,24 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson,
}
/**
* Returns true if the job instance is in {@link StatusEnum#IN_PROGRESS} and is not cancelled
* Returns true if the job instance is in:
* {@link StatusEnum#IN_PROGRESS}
* {@link StatusEnum#FINALIZE}
* and is not cancelled
*/
public boolean isRunning() {
return getStatus() == StatusEnum.IN_PROGRESS && !isCancelled();
if (isCancelled()) {
return false;
}
switch (getStatus()) {
case IN_PROGRESS:
case FINALIZE:
return true;
default:
Logs.getBatchTroubleshootingLog().debug("Status {} is considered \"not running\"", getStatus().name());
}
return false;
}
public boolean isFinished() {
@ -376,7 +390,7 @@ public class JobInstance extends JobInstanceStartRequest implements IModelJson,
}
public boolean isPendingCancellationRequest() {
return myCancelled && (myStatus == StatusEnum.QUEUED || myStatus == StatusEnum.IN_PROGRESS);
return myCancelled && myStatus.isCancellable();
}
/**

View File

@ -34,51 +34,53 @@ public enum StatusEnum {
/**
* Task is waiting to execute and should begin with no intervention required.
*/
QUEUED(true, false),
QUEUED(true, false, true),
/**
* Task is current executing
*/
IN_PROGRESS(true, false),
IN_PROGRESS(true, false, true),
/**
* For reduction steps
*/
FINALIZE(true, false),
FINALIZE(true, false, true),
/**
* Task completed successfully
*/
COMPLETED(false, true),
COMPLETED(false, true, false),
/**
* Task execution resulted in an error but the error may be transient (or transient status is unknown).
* Retrying may result in success.
*/
ERRORED(true, true),
ERRORED(true, true, false),
/**
* Task has failed and is known to be unrecoverable. There is no reason to believe that retrying will
* result in a different outcome.
*/
FAILED(true, true),
FAILED(true, true, false),
/**
* Task has been cancelled.
*/
CANCELLED(true, true);
CANCELLED(true, true, false);
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
private final boolean myIncomplete;
private final boolean myEnded;
private final boolean myIsCancellable;
private static StatusEnum[] ourIncompleteStatuses;
private static Set<StatusEnum> ourEndedStatuses;
private static Set<StatusEnum> ourNotEndedStatuses;
StatusEnum(boolean theIncomplete, boolean theEnded) {
StatusEnum(boolean theIncomplete, boolean theEnded, boolean theIsCancellable) {
myIncomplete = theIncomplete;
myEnded = theEnded;
myIsCancellable = theIsCancellable;
}
/**
@ -186,4 +188,8 @@ public enum StatusEnum {
public boolean isIncomplete() {
return myIncomplete;
}
public boolean isCancellable() {
return myIsCancellable;
}
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.batch2.config;
package ca.uhn.fhir.batch2.util;
/*-
* #%L
@ -30,4 +30,10 @@ public class Batch2Constants {
* date when performing operations that pull resources by time windows.
*/
public static final Date BATCH_START_DATE = new InstantType("2000-01-01T00:00:00Z").getValue();
/**
* This is a placeholder chunkid for the reduction step to allow it to be
* used in the message handling
*/
public static final String REDUCTION_STEP_CHUNK_ID_PLACEHOLDER = "REDUCTION";
}

View File

@ -26,7 +26,7 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.config.Batch2Constants;
import ca.uhn.fhir.batch2.util.Batch2Constants;
import ca.uhn.fhir.mdm.batch2.clear.MdmClearJobParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -369,9 +369,10 @@ public class BulkDataExportProvider {
myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToWriter(oo, response.getWriter());
response.getWriter().close();
break;
default:
ourLog.warn("Unrecognized status encountered: {}. Treating as BUILDING/SUBMITTED", info.getStatus().name());
case BUILDING:
case SUBMITTED:
default:
if (theRequestDetails.getRequestType() == RequestTypeEnum.DELETE) {
handleDeleteRequest(theJobId, response, info.getStatus());
} else {

View File

@ -2217,7 +2217,7 @@ public class JsonParserDstu3Test {
input = "{\"resourceType\":\"Basic\",\"id\":\"1\",\"text\":{\"status\":\"generated\",\"div\":\"<div></div>\"}}";
basic = ourCtx.newJsonParser().parseResource(Basic.class, input);
assertEquals("<div xmlns=\"http://www.w3.org/1999/xhtml\"/>", basic.getText().getDivAsString());
assertEquals("<div xmlns=\"http://www.w3.org/1999/xhtml\"></div>", basic.getText().getDivAsString());
input = "{\"resourceType\":\"Basic\",\"id\":\"1\",\"text\":{\"status\":\"generated\",\"div\":\"<div> </div>\"}}";
basic = ourCtx.newJsonParser().parseResource(Basic.class, input);

View File

@ -447,4 +447,9 @@ public final class HapiWorkerContext extends I18nBase implements IWorkerContext
public IWorkerContext setPackageTracker(IWorkerContextManager.IPackageLoadingTracker theIPackageLoadingTracker) {
throw new UnsupportedOperationException(Msg.code(220));
}
@Override
public String getSpecUrl() {
throw new UnsupportedOperationException(Msg.code(2260));
}
}

View File

@ -122,7 +122,6 @@ resource.Questionnaire=org.hl7.fhir.r5.model.Questionnaire
resource.QuestionnaireResponse=org.hl7.fhir.r5.model.QuestionnaireResponse
resource.RegulatedAuthorization=org.hl7.fhir.r5.model.RegulatedAuthorization
resource.RelatedPerson=org.hl7.fhir.r5.model.RelatedPerson
resource.RequestGroup=org.hl7.fhir.r5.model.RequestGroup
resource.RequestOrchestration=org.hl7.fhir.r5.model.RequestOrchestration
resource.Requirements=org.hl7.fhir.r5.model.Requirements
resource.ResearchStudy=org.hl7.fhir.r5.model.ResearchStudy

View File

@ -0,0 +1,82 @@
package ca.uhn.fhir.test.utilities;
/*-
* #%L
* HAPI FHIR Test Utilities
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class RangeTestHelper {
public static final double THOUSANDTH = .001d;
public static void checkInRange(double base, double value) {
checkInRange(base, THOUSANDTH, value);
}
public static void checkInRange(double theBase, double theRange, double theValue) {
double lowerBound = theBase - theRange;
double upperBound = theBase + theRange;
checkWithinBounds(lowerBound, upperBound, theValue);
}
public static void checkInRange(String theBase, String theValue) {
// ease tests
if (theBase == null && theValue == null) {
return;
}
double value = Double.parseDouble(theValue);
double base = Double.parseDouble(theBase);
checkInRange(base, THOUSANDTH, value);
}
public static void checkInRange(String theBase, double theRange, String theValue) {
// ease tests
if (theBase == null && theValue == null) {
return;
}
double value = Double.parseDouble(theValue);
double base = Double.parseDouble(theBase);
checkInRange(base, theRange, value);
}
public static void checkWithinBounds(double theLowerBound, double theUpperBound, double theValue) {
assertThat(theValue, is(both(greaterThanOrEqualTo(theLowerBound)).and(lessThanOrEqualTo(theUpperBound))));
}
public static void checkWithinBounds(String theLowerBound, String theUpperBound, String theValue) {
assertNotNull(theLowerBound, "theLowerBound");
assertNotNull(theUpperBound, "theUpperBound");
assertNotNull(theValue, "theValue");
double lowerBound = Double.parseDouble(theLowerBound);
double upperBound = Double.parseDouble(theUpperBound);
double value = Double.parseDouble(theValue);
checkWithinBounds(lowerBound, upperBound, value);
}
}

View File

@ -0,0 +1,163 @@
package ca.uhn.fhir.test.utilities;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertThrows;
class RangeTestHelperTest {
@Nested
public class DefaultRange {
@Test
void checkInRange() {
RangeTestHelper.checkInRange(.83d, .829999999d);
}
@Test
void checkLower() {
AssertionError thrown = assertThrows(
AssertionError.class,
() -> RangeTestHelper.checkInRange(.91, .83)
);
}
@Test
void checkHigher() {
AssertionError thrown = assertThrows(
AssertionError.class,
() -> RangeTestHelper.checkInRange(.26, .25)
);
}
@Nested
public class WithinBounds {
@Test
void checkInRange() {
RangeTestHelper.checkWithinBounds(.91001, .91002, .910013);
}
@Test
void checkLower() {
AssertionError thrown = assertThrows(
AssertionError.class,
() -> RangeTestHelper.checkWithinBounds(.91001, .91002, .9013)
);
}
@Test
void checkHigher() {
AssertionError thrown = assertThrows(
AssertionError.class,
() -> RangeTestHelper.checkWithinBounds(.87, .88, .9)
);
}
@Nested
public class PassingStrings {
@Test
void checkInRange() {
RangeTestHelper.checkWithinBounds(".91001", ".91002", ".910013");
}
@Test
void checkLower() {
AssertionError thrown = assertThrows(
AssertionError.class,
() -> RangeTestHelper.checkWithinBounds(".91001", ".91002", ".9013")
);
}
@Test
void checkHigher() {
AssertionError thrown = assertThrows(
AssertionError.class,
() -> RangeTestHelper.checkWithinBounds(".87", ".88", ".9")
);
}
}
}
@Nested
public class PassingStrings {
@Test
void checkInRange() {
RangeTestHelper.checkInRange("0.83", "0.829999999");
}
@Test
void checkLower() {
AssertionError thrown = assertThrows(
AssertionError.class,
() -> RangeTestHelper.checkInRange(".91", ".83")
);
}
@Test
void checkHigher() {
AssertionError thrown = assertThrows(
AssertionError.class,
() -> RangeTestHelper.checkInRange(".26", "0.25")
);
}
}
}
@Nested
public class ProvidedRange {
@Test
void checkInRange() {
// equals to higher bound
RangeTestHelper.checkInRange(.83, .1, .83);
RangeTestHelper.checkInRange(.831, .02, .833);
}
@Test
void checkLower() {
AssertionError thrown = assertThrows(
AssertionError.class,
() -> RangeTestHelper.checkInRange(.84, .01, .82)
);
}
@Test
void checkHigher() {
AssertionError thrown = assertThrows(
AssertionError.class,
() -> RangeTestHelper.checkInRange(.2511,.0001, .2513)
);
}
@Nested
public class PassingStrings {
@Test
void checkInRange() {
RangeTestHelper.checkInRange(".82", .01, ".83");
RangeTestHelper.checkInRange(".83d", .829999999d, ".8312d");
}
@Test
void checkLower() {
AssertionError thrown = assertThrows(
AssertionError.class,
() -> RangeTestHelper.checkInRange(".91", .02, ".83")
);
}
@Test
void checkHigher() {
AssertionError thrown = assertThrows(
AssertionError.class,
() -> RangeTestHelper.checkInRange(".26", .03, "0.3")
);
}
}
}
}

View File

@ -171,6 +171,13 @@ public class VersionSpecificWorkerContextWrapper extends I18nBase implements IWo
throw new UnsupportedOperationException(Msg.code(2114));
}
@Override
public String getSpecUrl() {
return "";
}
@Override
public PackageInformation getPackageForUrl(String s) {
throw new UnsupportedOperationException(Msg.code(2109));

View File

@ -861,9 +861,11 @@
</license>
</licenses>
<properties>
<properties>
<fhir_core_version>5.6.881</fhir_core_version>
<ucum_version>1.0.3</ucum_version>
<surefire_jvm_args>-Dfile.encoding=UTF-8 -Xmx2048m</surefire_jvm_args>
<fhir_core_version>5.6.84</fhir_core_version>
<ucum_version>1.0.3</ucum_version>
<surefire_jvm_args>-Dfile.encoding=UTF-8 -Xmx2048m</surefire_jvm_args>