batch2 rework (#3964)

* consolidated Batch2Helper
check status in assert before running maintenance
ignore illegal job instance state transition

* Fix mock tests

* rename intermittent to IT

* extra logging

* into to debug

* review feedback

* added more logging

* removed fasttracking.  replaced with support for triggering a maintenance run

* fixed tests

* fixed tests

* add asserts

* licenses

* snakeyaml cve version bump

* data migration

* merge origin/master

* merge origin/master

* fix test (support triggering jobs when scheduling is disabled)

* add counter to abort potential infinite loop after 100000 calls

* compile error

* move multipartition mdm tests out to a separate IT
update mongo job instance creation with new logic

* fix race condition

* back out infinite loop circuit breaker.  it breaks terminology tests that depend on this loop running > 100000 times.

* fix test (triggered job key was missing group)

* revert saveAllDeferred

* Almost finished getting JpaPersistenceR4IT to pass

* testBulkExportDeleteForOnGoingJob still not passing

* testBulkExportDeleteForOnGoingJob still not passing

* Removed method `fetchInstanceAndMarkInProgress`.  It was clobbering other state transitions like CANCELLED.

* undo rename of cancelled to cancelRequested.  Thymeleaf depends on the old name

* ExpandResourcesStep requires a synchronous search

* Tests are passing but man testBulkExportDeleteForOnGoingJob is a flakey test

* fix test

* fix test

* change log

* allow ERRORED -> FAILED state transition

* add a timeout to saveAllDeferred()

* improve error message on timeout message

* fix tests

* added more logging

* added more logging

* fix semaphore issue

* Msg.code

* fix test

* javadoc

* switch all batch loggers to batch troubleshooting log

* add unit test, improve logging

* move TimeoutManager to its own class.  Add tests.

* fix tests

* comment cleanup

* test race condition

* review feedback

Co-authored-by: Ken Stevens <ken@smilecdr.com>
This commit is contained in:
Ken Stevens 2022-09-05 00:04:54 -04:00 committed by GitHub
parent a4a7fd9344
commit 100b8f9190
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
164 changed files with 1865 additions and 1084 deletions

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -25,7 +25,7 @@ public final class Msg {
/**
* IMPORTANT: Please update the following comment after you add a new code
* Last code value: 2132
* Last used code value: 2134
*/
private Msg() {}

View File

@ -414,7 +414,7 @@ public class StopWatch {
}
@VisibleForTesting
static void setNowForUnitTestForUnitTest(Long theNowForUnitTest) {
static public void setNowForUnitTest(Long theNowForUnitTest) {
ourNowForUnitTest = theNowForUnitTest;
}

View File

@ -0,0 +1,7 @@
package ca.uhn.fhir.util;
public class TimeoutException extends RuntimeException {
public TimeoutException(String theMessage) {
super(theMessage);
}
}

View File

@ -0,0 +1,52 @@
package ca.uhn.fhir.util;
import ca.uhn.fhir.i18n.Msg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
public class TimeoutManager {
private static final Logger ourLog = LoggerFactory.getLogger(TimeoutManager.class);
private final StopWatch myStopWatch = new StopWatch();
private final String myServiceName;
private final Duration myWarningTimeout;
private final Duration myErrorTimeout;
private boolean warned = false;
private boolean errored = false;
public TimeoutManager(String theServiceName, Duration theWarningTimeout, Duration theErrorTimeout) {
myServiceName = theServiceName;
myWarningTimeout = theWarningTimeout;
myErrorTimeout = theErrorTimeout;
}
/**
*
* @return true if a message was logged
*/
public boolean checkTimeout() {
boolean retval = false;
if (myStopWatch.getMillis() > myWarningTimeout.toMillis() && !warned) {
ourLog.warn(myServiceName + " has run for {}", myStopWatch);
warned = true;
retval = true;
}
if (myStopWatch.getMillis() > myErrorTimeout.toMillis() && !errored) {
if ("true".equalsIgnoreCase(System.getProperty("unit_test_mode"))) {
throw new TimeoutException(Msg.code(2133) + myServiceName + " timed out after running for " + myStopWatch);
} else {
ourLog.error(myServiceName + " has run for {}", myStopWatch);
errored = true;
retval = true;
}
}
return retval;
}
public void addTimeForUnitTest(Duration theDuration) {
myStopWatch.setNowForUnitTest(myStopWatch.getStartedDate().getTime() + theDuration.toMillis());
}
}

View File

@ -9,7 +9,10 @@ import java.util.Date;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.matchesPattern;
import static org.hamcrest.Matchers.oneOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class StopWatchTest {
@ -18,7 +21,7 @@ public class StopWatchTest {
@AfterEach
public void after() {
StopWatch.setNowForUnitTestForUnitTest(null);
StopWatch.setNowForUnitTest(null);
}
private double calculateThroughput(int theMinutesElapsed, int theNumOperations) {
@ -30,70 +33,70 @@ public class StopWatchTest {
@Test
public void testEstimatedTimeRemainingOutOfOne() {
StopWatch.setNowForUnitTestForUnitTest(777777777L);
StopWatch.setNowForUnitTest(777777777L);
StopWatch sw = new StopWatch();
// Less than half
StopWatch.setNowForUnitTestForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
StopWatch.setNowForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
assertEquals("00:09:00", sw.getEstimatedTimeRemaining(0.1, 1.0));
StopWatch.setNowForUnitTestForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
StopWatch.setNowForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
assertEquals("00:09:00", sw.getEstimatedTimeRemaining(1, 10));
StopWatch.setNowForUnitTestForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE + 100);
StopWatch.setNowForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE + 100);
assertEquals("00:09:00", sw.getEstimatedTimeRemaining(0.1, 1.0));
StopWatch.setNowForUnitTestForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
StopWatch.setNowForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
assertEquals("00:19:00", sw.getEstimatedTimeRemaining(0.05, 1.0));
StopWatch.setNowForUnitTestForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
StopWatch.setNowForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
assertEquals("00:39:00", sw.getEstimatedTimeRemaining(0.025, 1.0));
// More than half
StopWatch.setNowForUnitTestForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
StopWatch.setNowForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
assertEquals("00:01:00.000", sw.getEstimatedTimeRemaining(0.5, 1.0));
StopWatch.setNowForUnitTestForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
StopWatch.setNowForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
assertEquals("00:00:59.760", sw.getEstimatedTimeRemaining(0.501, 1.0));
StopWatch.setNowForUnitTestForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
StopWatch.setNowForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
assertEquals("00:00:40.000", sw.getEstimatedTimeRemaining(0.6, 1.0));
StopWatch.setNowForUnitTestForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
StopWatch.setNowForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
assertEquals("6666ms", sw.getEstimatedTimeRemaining(0.9, 1.0));
StopWatch.setNowForUnitTestForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
StopWatch.setNowForUnitTest(777777777L + DateUtils.MILLIS_PER_MINUTE);
assertEquals("60ms", sw.getEstimatedTimeRemaining(0.999, 1.0));
}
@Test
public void testEstimatedTimeRemainingOutOfOneHundred() {
StopWatch.setNowForUnitTestForUnitTest(777777777L);
StopWatch.setNowForUnitTest(777777777L);
StopWatch sw = new StopWatch();
StopWatch.setNowForUnitTestForUnitTest(777777777L + (10 * DateUtils.MILLIS_PER_MINUTE));
StopWatch.setNowForUnitTest(777777777L + (10 * DateUtils.MILLIS_PER_MINUTE));
assertEquals("01:30:00", sw.getEstimatedTimeRemaining(10, 100));
StopWatch.setNowForUnitTestForUnitTest(777777777L + (DateUtils.MILLIS_PER_MINUTE));
StopWatch.setNowForUnitTest(777777777L + (DateUtils.MILLIS_PER_MINUTE));
assertEquals("00:04:00", sw.getEstimatedTimeRemaining(20, 100));
StopWatch.setNowForUnitTestForUnitTest(777777777L + (30 * DateUtils.MILLIS_PER_MINUTE));
StopWatch.setNowForUnitTest(777777777L + (30 * DateUtils.MILLIS_PER_MINUTE));
assertEquals("01:10:00", sw.getEstimatedTimeRemaining(30, 100));
StopWatch.setNowForUnitTestForUnitTest(777777777L + (40 * DateUtils.MILLIS_PER_MINUTE));
StopWatch.setNowForUnitTest(777777777L + (40 * DateUtils.MILLIS_PER_MINUTE));
assertEquals("01:00:00", sw.getEstimatedTimeRemaining(40, 100));
StopWatch.setNowForUnitTestForUnitTest(777777777L + (50 * DateUtils.MILLIS_PER_MINUTE));
StopWatch.setNowForUnitTest(777777777L + (50 * DateUtils.MILLIS_PER_MINUTE));
assertEquals("00:50:00", sw.getEstimatedTimeRemaining(50, 100));
StopWatch.setNowForUnitTestForUnitTest(777777777L + (60 * DateUtils.MILLIS_PER_MINUTE));
StopWatch.setNowForUnitTest(777777777L + (60 * DateUtils.MILLIS_PER_MINUTE));
assertEquals("00:40:00", sw.getEstimatedTimeRemaining(60, 100));
StopWatch.setNowForUnitTestForUnitTest(777777777L + (60 * DateUtils.MILLIS_PER_MINUTE));
StopWatch.setNowForUnitTest(777777777L + (60 * DateUtils.MILLIS_PER_MINUTE));
assertEquals("00:00:36.363", sw.getEstimatedTimeRemaining(99, 100));
StopWatch.setNowForUnitTestForUnitTest(777777777L + (60 * DateUtils.MILLIS_PER_MINUTE));
StopWatch.setNowForUnitTest(777777777L + (60 * DateUtils.MILLIS_PER_MINUTE));
assertEquals("360ms", sw.getEstimatedTimeRemaining(99.99, 100));
}
@ -116,13 +119,13 @@ public class StopWatchTest {
public void testFormatTaskDurations() {
StopWatch sw = new StopWatch();
StopWatch.setNowForUnitTestForUnitTest(1000L);
StopWatch.setNowForUnitTest(1000L);
sw.startTask("TASK1");
StopWatch.setNowForUnitTestForUnitTest(1500L);
StopWatch.setNowForUnitTest(1500L);
sw.startTask("TASK2");
StopWatch.setNowForUnitTestForUnitTest(1600L);
StopWatch.setNowForUnitTest(1600L);
String taskDurations = sw.formatTaskDurations();
ourLog.info(taskDurations);
assertEquals("TASK1: 500ms\nTASK2: 100ms", taskDurations);
@ -132,19 +135,19 @@ public class StopWatchTest {
public void testFormatTaskDurationsDelayBetweenTasks() {
StopWatch sw = new StopWatch();
StopWatch.setNowForUnitTestForUnitTest(1000L);
StopWatch.setNowForUnitTest(1000L);
sw.startTask("TASK1");
StopWatch.setNowForUnitTestForUnitTest(1500L);
StopWatch.setNowForUnitTest(1500L);
sw.endCurrentTask();
StopWatch.setNowForUnitTestForUnitTest(2000L);
StopWatch.setNowForUnitTest(2000L);
sw.startTask("TASK2");
StopWatch.setNowForUnitTestForUnitTest(2100L);
StopWatch.setNowForUnitTest(2100L);
sw.endCurrentTask();
StopWatch.setNowForUnitTestForUnitTest(2200L);
StopWatch.setNowForUnitTest(2200L);
String taskDurations = sw.formatTaskDurations();
ourLog.info(taskDurations);
assertEquals("TASK1: 500ms\n" +
@ -157,13 +160,13 @@ public class StopWatchTest {
public void testFormatTaskDurationsLongDelayBeforeStart() {
StopWatch sw = new StopWatch(0);
StopWatch.setNowForUnitTestForUnitTest(1000L);
StopWatch.setNowForUnitTest(1000L);
sw.startTask("TASK1");
StopWatch.setNowForUnitTestForUnitTest(1500L);
StopWatch.setNowForUnitTest(1500L);
sw.startTask("TASK2");
StopWatch.setNowForUnitTestForUnitTest(1600L);
StopWatch.setNowForUnitTest(1600L);
String taskDurations = sw.formatTaskDurations();
ourLog.info(taskDurations);
assertEquals("Before first task: 1000ms\nTASK1: 500ms\nTASK2: 100ms", taskDurations);

View File

@ -0,0 +1,115 @@
package ca.uhn.fhir.util;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@ExtendWith(MockitoExtension.class)
class TimeoutManagerTest {
public static final String TEST_SERVICE_NAME = "TEST TIMEOUT";
final Duration myWarningTimeout = Duration.ofDays(1);
final Duration myErrorTimeout = Duration.ofDays(10);
@Mock
private Appender<ILoggingEvent> myAppender;
@Captor
ArgumentCaptor<ILoggingEvent> myLoggingEvent;
TimeoutManager mySvc = new TimeoutManager(TEST_SERVICE_NAME, myWarningTimeout, myErrorTimeout);
@BeforeEach
void before() {
ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(TimeoutManager.class);
logger.addAppender(myAppender);
}
@AfterEach
void after() {
ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(TimeoutManager.class);
logger.detachAppender(myAppender);
verifyNoMoreInteractions(myAppender);
System.clearProperty("unit_test");
}
@Test
public void checkTimeout_noThreadholdHit_noLogging() {
// execute
assertFalse(mySvc.checkTimeout());
// verify
verifyNoInteractions(myAppender);
}
@Test
public void checkTimeout_warningThreadholdHit_warningLogged() {
// setup
mySvc.addTimeForUnitTest(Duration.ofDays(2));
// execute
assertTrue(mySvc.checkTimeout());
// verify
verify(myAppender, times(1)).doAppend(myLoggingEvent.capture());
ILoggingEvent event = myLoggingEvent.getValue();
assertEquals(Level.WARN, event.getLevel());
assertEquals(TEST_SERVICE_NAME + " has run for 2.0 days", event.getFormattedMessage());
}
@Test
public void checkTimeout_errorThreadholdHit_errorLogged() {
// setup
mySvc.addTimeForUnitTest(Duration.ofDays(20));
// execute
mySvc.checkTimeout();
// verify
verify(myAppender, times(2)).doAppend(myLoggingEvent.capture());
ILoggingEvent event1 = myLoggingEvent.getAllValues().get(0);
assertEquals(Level.WARN, event1.getLevel());
assertEquals(TEST_SERVICE_NAME + " has run for 20 days", event1.getFormattedMessage());
ILoggingEvent event2 = myLoggingEvent.getAllValues().get(1);
assertEquals(Level.ERROR, event2.getLevel());
assertEquals(TEST_SERVICE_NAME + " has run for 20 days", event2.getFormattedMessage());
}
@Test
public void checkTimeout_errorThreadholdHitInUnitTest_throwsException() {
// setup
System.setProperty("unit_test_mode", "true");
mySvc.addTimeForUnitTest(Duration.ofDays(20));
// execute
try {
mySvc.checkTimeout();
fail();
} catch (TimeoutException e) {
assertEquals("HAPI-2133: TEST TIMEOUT timed out after running for 20 days", e.getMessage());
}
verify(myAppender, times(1)).doAppend(myLoggingEvent.capture());
verify(myAppender, times(1)).doAppend(myLoggingEvent.capture());
ILoggingEvent event = myLoggingEvent.getValue();
assertEquals(Level.WARN, event.getLevel());
assertEquals(TEST_SERVICE_NAME + " has run for 20 days", event.getFormattedMessage());
}
}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -3,14 +3,14 @@
<modelVersion>4.0.0</modelVersion>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-bom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<packaging>pom</packaging>
<name>HAPI FHIR BOM</name>
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -20,9 +20,9 @@ package ca.uhn.fhir.cli;
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor;
import com.google.common.collect.Sets;

View File

@ -20,8 +20,8 @@ package ca.uhn.fhir.cli;
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.apache.commons.cli.CommandLine;

View File

@ -2,8 +2,8 @@ package ca.uhn.fhir.cli;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.test.utilities.TlsAuthenticationTestHelper;
import ca.uhn.fhir.test.utilities.RestServerDstu3Helper;
import ca.uhn.fhir.test.utilities.TlsAuthenticationTestHelper;
import ca.uhn.fhir.util.TestUtil;
import org.hl7.fhir.dstu3.model.Bundle;
import org.hl7.fhir.dstu3.model.ConceptMap;
@ -12,6 +12,7 @@ import org.hl7.fhir.dstu3.model.ConceptMap.SourceElementComponent;
import org.hl7.fhir.dstu3.model.ConceptMap.TargetElementComponent;
import org.hl7.fhir.dstu3.model.Enumerations.ConceptMapEquivalence;
import org.hl7.fhir.exceptions.FHIRException;
import org.hl7.fhir.r4.model.Enumerations;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -128,6 +129,7 @@ public class ImportCsvToConceptMapCommandDstu3Test {
"-i", VS_URL_1,
"-o", VS_URL_2,
"-f", myFile,
"-s", Enumerations.PublicationStatus.ACTIVE.toCode(),
"-l"
},
"-t", theIncludeTls, myRestServerDstu3Helper
@ -328,6 +330,7 @@ public class ImportCsvToConceptMapCommandDstu3Test {
"-i", VS_URL_1,
"-o", VS_URL_2,
"-f", myFile,
"-s", Enumerations.PublicationStatus.ACTIVE.toCode(),
"-l"
},
"-t", theIncludeTls, myRestServerDstu3Helper

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-cli</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
@ -111,7 +111,7 @@
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.26</version>
<version>1.31</version>
</dependency>
<dependency>

View File

@ -0,0 +1,6 @@
---
type: fix
issue: 3964
title: "Fast-tracking batch jobs that produced only one chunk has been rewritten to use Quartz triggerJob. This will
ensure that at most one thread is updating job status at a time. Also jobs that had FAILED, ERRORED, or been CANCELLED
could be accidentally set back to IN_PROGRESS; this has been corrected."

View File

@ -11,7 +11,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -41,4 +41,6 @@ public interface IHapiScheduler {
void scheduleJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition);
Set<JobKey> getJobKeysForUnitTest() throws SchedulerException;
default void triggerJobImmediately(ScheduledJobDefinition theJobDefinition) {}
}

View File

@ -54,4 +54,22 @@ public interface ISchedulerService {
Set<JobKey> getClusteredJobKeysForUnitTest() throws SchedulerException;
boolean isStopping();
/**
* Rather than waiting for the job to fire at its scheduled time, fire it immediately.
* @param theJobDefinition
*/
default void triggerLocalJobImmediately(ScheduledJobDefinition theJobDefinition) {}
/**
* Rather than waiting for the job to fire at its scheduled time, fire it immediately.
* @param theJobDefinition
*/
default void triggerClusteredJobImmediately(ScheduledJobDefinition theJobDefinition) {}
/**
* @return true if this server supports clustered scheduling
*/
default boolean isClusteredSchedulingEnabled() { return false; }
}

View File

@ -23,6 +23,8 @@ package ca.uhn.fhir.jpa.model.sched;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.quartz.Job;
import org.quartz.JobKey;
import org.quartz.TriggerKey;
import java.util.Collections;
import java.util.HashMap;
@ -86,4 +88,12 @@ public class ScheduledJobDefinition {
.append("myGroup", myGroup)
.toString();
}
public JobKey toJobKey() {
return new JobKey(getId(), getGroup());
}
public TriggerKey toTriggerKey() {
return new TriggerKey(getId(), getGroup());
}
}

View File

@ -52,7 +52,7 @@ public class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory impleme
String next = toString(bundle.getNextFireTime());
String fireInstanceId = bundle.getTrigger().getFireInstanceId();
JobKey key = bundle.getJobDetail().getKey();
ourLog.debug("Firing job[{}] ID[{}] - Previous[{}] Scheduled[{}] Next[{}]", key, fireInstanceId, prev, scheduled, next);
ourLog.trace("Firing job[{}] ID[{}] - Previous[{}] Scheduled[{}] Next[{}]", key, fireInstanceId, prev, scheduled, next);
Object job = super.createJobInstance(bundle);
myBeanFactory.autowireBean(job);

View File

@ -20,8 +20,8 @@ package ca.uhn.fhir.jpa.sched;
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.model.sched.IHapiScheduler;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
@ -178,13 +178,8 @@ public abstract class BaseHapiScheduler implements IHapiScheduler {
Validate.notNull(theJobDefinition.getJobClass());
Validate.notBlank(theJobDefinition.getId());
JobKey jobKey = new JobKey(theJobDefinition.getId(), theJobDefinition.getGroup());
TriggerKey triggerKey = new TriggerKey(theJobDefinition.getId(), theJobDefinition.getGroup());
JobDetailImpl jobDetail = new NonConcurrentJobDetailImpl();
jobDetail.setJobClass(theJobDefinition.getJobClass());
jobDetail.setKey(jobKey);
jobDetail.setJobDataMap(new JobDataMap(theJobDefinition.getJobData()));
TriggerKey triggerKey = theJobDefinition.toTriggerKey();
JobDetailImpl jobDetail = buildJobDetail(theJobDefinition);
ScheduleBuilder<? extends Trigger> schedule = SimpleScheduleBuilder
.simpleSchedule()
@ -208,6 +203,15 @@ public abstract class BaseHapiScheduler implements IHapiScheduler {
}
@Nonnull
private JobDetailImpl buildJobDetail(ScheduledJobDefinition theJobDefinition) {
JobDetailImpl jobDetail = new NonConcurrentJobDetailImpl();
jobDetail.setJobClass(theJobDefinition.getJobClass());
jobDetail.setKey(theJobDefinition.toJobKey());
jobDetail.setJobDataMap(new JobDataMap(theJobDefinition.getJobData()));
return jobDetail;
}
@VisibleForTesting
@Override
public Set<JobKey> getJobKeysForUnitTest() throws SchedulerException {
@ -223,4 +227,13 @@ public abstract class BaseHapiScheduler implements IHapiScheduler {
return true;
}
}
@Override
public void triggerJobImmediately(ScheduledJobDefinition theJobDefinition) {
try {
myScheduler.triggerJob(theJobDefinition.toJobKey());
} catch (SchedulerException e) {
ourLog.error("Error triggering scheduled job with key {}", theJobDefinition);
}
}
}

View File

@ -20,8 +20,8 @@ package ca.uhn.fhir.jpa.sched;
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.model.sched.IHapiScheduler;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ISmartLifecyclePhase;
@ -95,6 +95,7 @@ public abstract class BaseSchedulerServiceImpl implements ISchedulerService, Sma
myLocalSchedulingEnabled = theLocalSchedulingEnabled;
}
@Override
public boolean isClusteredSchedulingEnabled() {
return myClusteredSchedulingEnabled;
}
@ -107,6 +108,10 @@ public abstract class BaseSchedulerServiceImpl implements ISchedulerService, Sma
public void create() throws SchedulerException {
myLocalScheduler = createScheduler(false);
myClusteredScheduler = createScheduler(true);
if (isSchedulingDisabled()) {
setLocalSchedulingEnabled(false);
setClusteredSchedulingEnabled(false);
}
myStopping.set(false);
}
@ -206,9 +211,7 @@ public abstract class BaseSchedulerServiceImpl implements ISchedulerService, Sma
assert theJobDefinition.getJobClass() != null;
ourLog.info("Scheduling {} job {} with interval {}", theInstanceName, theJobDefinition.getId(), StopWatch.formatMillis(theIntervalMillis));
if (theJobDefinition.getGroup() == null) {
theJobDefinition.setGroup(myDefaultGroup);
}
defaultGroup(theJobDefinition);
theScheduler.scheduleJob(theIntervalMillis, theJobDefinition);
}
@ -234,4 +237,21 @@ public abstract class BaseSchedulerServiceImpl implements ISchedulerService, Sma
return myStopping.get();
}
@Override
public void triggerClusteredJobImmediately(ScheduledJobDefinition theJobDefinition) {
defaultGroup(theJobDefinition);
myClusteredScheduler.triggerJobImmediately(theJobDefinition);
}
@Override
public void triggerLocalJobImmediately(ScheduledJobDefinition theJobDefinition) {
defaultGroup(theJobDefinition);
myLocalScheduler.triggerJobImmediately(theJobDefinition);
}
private void defaultGroup(ScheduledJobDefinition theJobDefinition) {
if (theJobDefinition.getGroup() == null) {
theJobDefinition.setGroup(myDefaultGroup);
}
}
}

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -25,17 +25,16 @@ import ca.uhn.fhir.batch2.api.JobOperationResultJson;
import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.batch2.model.MarkWorkChunkAsErrorRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.util.JobInstanceUtil;
import ca.uhn.fhir.model.api.PagingIterator;
import ca.uhn.fhir.narrative.BaseThymeleafNarrativeGenerator;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
@ -111,6 +110,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
entity.setStatus(theInstance.getStatus());
entity.setParams(theInstance.getParameters());
entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId());
entity.setFastTracking(theInstance.isFastTracking());
entity.setCreateTime(new Date());
entity.setStartTime(new Date());
entity.setReport(theInstance.getReport());
@ -119,12 +119,6 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
return entity.getId();
}
@Override
public Optional<JobInstance> fetchInstanceAndMarkInProgress(String theInstanceId) {
myJobInstanceRepository.updateInstanceStatus(theInstanceId, StatusEnum.IN_PROGRESS);
return fetchInstance(theInstanceId);
}
public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) {
PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, "myCreateTime");
return toInstanceList(myJobInstanceRepository.fetchInstancesByJobDefinitionIdAndStatus(theJobDefinitionId, theRequestedStatuses, pageRequest));
@ -156,7 +150,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Override
@Nonnull
public Optional<JobInstance> fetchInstance(String theInstanceId) {
return myJobInstanceRepository.findById(theInstanceId).map(t -> toInstance(t));
return myJobInstanceRepository.findById(theInstanceId).map(this::toInstance);
}
@Override
@ -294,24 +288,25 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
int recordsChangedByStatusUpdate = myJobInstanceRepository.updateInstanceStatus(theInstance.getInstanceId(), theInstance.getStatus());
Optional<Batch2JobInstanceEntity> instanceOpt = myJobInstanceRepository.findById(theInstance.getInstanceId());
Batch2JobInstanceEntity instance = instanceOpt.orElseThrow(() -> new IllegalArgumentException("Unknown instance ID: " + theInstance.getInstanceId()));
Batch2JobInstanceEntity instanceEntity = instanceOpt.orElseThrow(() -> new IllegalArgumentException("Unknown instance ID: " + theInstance.getInstanceId()));
instance.setStartTime(theInstance.getStartTime());
instance.setEndTime(theInstance.getEndTime());
instance.setStatus(theInstance.getStatus());
instance.setCancelled(theInstance.isCancelled());
instance.setCombinedRecordsProcessed(theInstance.getCombinedRecordsProcessed());
instance.setCombinedRecordsProcessedPerSecond(theInstance.getCombinedRecordsProcessedPerSecond());
instance.setTotalElapsedMillis(theInstance.getTotalElapsedMillis());
instance.setWorkChunksPurged(theInstance.isWorkChunksPurged());
instance.setProgress(theInstance.getProgress());
instance.setErrorMessage(theInstance.getErrorMessage());
instance.setErrorCount(theInstance.getErrorCount());
instance.setEstimatedTimeRemaining(theInstance.getEstimatedTimeRemaining());
instance.setCurrentGatedStepId(theInstance.getCurrentGatedStepId());
instance.setReport(theInstance.getReport());
instanceEntity.setStartTime(theInstance.getStartTime());
instanceEntity.setEndTime(theInstance.getEndTime());
instanceEntity.setStatus(theInstance.getStatus());
instanceEntity.setCancelled(theInstance.isCancelled());
instanceEntity.setFastTracking(theInstance.isFastTracking());
instanceEntity.setCombinedRecordsProcessed(theInstance.getCombinedRecordsProcessed());
instanceEntity.setCombinedRecordsProcessedPerSecond(theInstance.getCombinedRecordsProcessedPerSecond());
instanceEntity.setTotalElapsedMillis(theInstance.getTotalElapsedMillis());
instanceEntity.setWorkChunksPurged(theInstance.isWorkChunksPurged());
instanceEntity.setProgress(theInstance.getProgress());
instanceEntity.setErrorMessage(theInstance.getErrorMessage());
instanceEntity.setErrorCount(theInstance.getErrorCount());
instanceEntity.setEstimatedTimeRemaining(theInstance.getEstimatedTimeRemaining());
instanceEntity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId());
instanceEntity.setReport(theInstance.getReport());
myJobInstanceRepository.save(instance);
myJobInstanceRepository.save(instanceEntity);
return recordsChangedByStatusUpdate > 0;
}

View File

@ -36,7 +36,6 @@ import ca.uhn.fhir.jpa.bulk.export.model.ExportPIDIteratorParameters;
import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.index.IJpaIdHelperService;
import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
@ -60,8 +59,6 @@ import org.hl7.fhir.instance.model.api.IBaseExtension;
import org.hl7.fhir.instance.model.api.IBaseReference;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Patient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -161,7 +158,9 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor {
Set<String> expandedMemberResourceIds = expandAllPatientPidsFromGroup(theParams);
if (ourLog.isDebugEnabled()) {
ourLog.debug("Group/{} has been expanded to members:[{}]", theParams, String.join(",", expandedMemberResourceIds));
if (!expandedMemberResourceIds.isEmpty()) {
ourLog.debug("Group {} has been expanded to members:[{}]", theParams.getResourceType(), String.join(",", expandedMemberResourceIds));
}
}
//Next, let's search for the target resources, with their correct patient references, chunked.

View File

@ -83,6 +83,8 @@ public class Batch2JobInstanceEntity implements Serializable {
@Column(name = "JOB_CANCELLED", nullable = false)
private boolean myCancelled;
@Column(name = "FAST_TRACKING", nullable = true)
private Boolean myFastTracking;
@Column(name = "PARAMS_JSON", length = PARAMS_JSON_MAX_LENGTH, nullable = true)
private String myParamsJson;
@Lob
@ -299,4 +301,18 @@ public class Batch2JobInstanceEntity implements Serializable {
.append("report", myReport)
.toString();
}
/**
* @return true if every step of the job has produced exactly 1 chunk.
*/
public boolean isFastTracking() {
if (myFastTracking == null) {
myFastTracking = false;
}
return myFastTracking;
}
public void setFastTracking(boolean theFastTracking) {
myFastTracking = theFastTracking;
}
}

View File

@ -85,6 +85,18 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
init570(); // 20211102 -
init600(); // 20211102 -
init610();
init620();
}
private void init620() {
Builder version = forVersion(VersionEnum.V6_2_0);
// add new REPORT column to BATCH2 tables
version
.onTable("BT2_JOB_INSTANCE")
.addColumn("20220830.1", "FAST_TRACKING")
.nullable()
.type(ColumnTypeEnum.BOOLEAN);
}
private void init610() {

View File

@ -20,41 +20,17 @@ package ca.uhn.fhir.jpa.partition;
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.ReadPartitionIdRequestDetails;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.model.entity.PartitionablePartitionId;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import static ca.uhn.fhir.jpa.model.util.JpaConstants.ALL_PARTITIONS_NAME;
import static ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster.doCallHooks;
import static ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster.doCallHooksAndReturnObject;
import static ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster.hasHooks;
public class RequestPartitionHelperSvc extends BaseRequestPartitionHelperSvc {

View File

@ -60,9 +60,7 @@ import org.hl7.fhir.r4.model.ConceptMap;
import org.hl7.fhir.r4.model.ValueSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationManager;

View File

@ -42,8 +42,10 @@ import ca.uhn.fhir.jpa.term.api.ITermVersionAdapterSvc;
import ca.uhn.fhir.jpa.term.models.TermCodeSystemDeleteJobParameters;
import ca.uhn.fhir.jpa.term.models.TermCodeSystemDeleteVersionJobParameters;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.TimeoutManager;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.hl7.fhir.r4.model.ConceptMap;
import org.hl7.fhir.r4.model.ValueSet;
import org.quartz.JobExecutionContext;
@ -57,6 +59,8 @@ import org.springframework.transaction.support.TransactionSynchronizationManager
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -72,6 +76,8 @@ import static ca.uhn.fhir.jpa.batch.config.BatchConstants.TERM_CODE_SYSTEM_VERSI
public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
private static final Logger ourLog = LoggerFactory.getLogger(TermDeferredStorageSvcImpl.class);
private static final long SAVE_ALL_DEFERRED_WARN_MINUTES = 1;
private static final long SAVE_ALL_DEFERRED_ERROR_MINUTES = 5;
private final List<TermCodeSystem> myDeferredCodeSystemsDeletions = Collections.synchronizedList(new ArrayList<>());
private final Queue<TermCodeSystemVersion> myDeferredCodeSystemVersionsDeletions = new ConcurrentLinkedQueue<>();
private final List<TermConcept> myDeferredConcepts = Collections.synchronizedList(new ArrayList<>());
@ -268,7 +274,14 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
@Override
public void saveAllDeferred() {
TimeoutManager timeoutManager = new TimeoutManager(TermDeferredStorageSvcImpl.class.getName() + ".saveAllDeferred()",
Duration.of(SAVE_ALL_DEFERRED_WARN_MINUTES, ChronoUnit.MINUTES),
Duration.of(SAVE_ALL_DEFERRED_ERROR_MINUTES, ChronoUnit.MINUTES));
while (!isStorageQueueEmpty()) {
if (timeoutManager.checkTimeout()) {
ourLog.info(toString());
}
saveDeferred();
}
}
@ -353,7 +366,7 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
TermCodeSystemDeleteVersionJobParameters parameters = new TermCodeSystemDeleteVersionJobParameters();
parameters.setCodeSystemVersionPid(theCodeSystemVersionPid);
request.setParameters(parameters);
Batch2JobStartResponse response = myJobCoordinator.startInstance(request);
myJobExecutions.add(response.getJobId());
}
@ -500,5 +513,17 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
}
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("myDeferredCodeSystemsDeletions", myDeferredCodeSystemsDeletions.size())
.append("myDeferredCodeSystemVersionsDeletions", myDeferredCodeSystemVersionsDeletions.size())
.append("myDeferredConcepts", myDeferredConcepts.size())
.append("myDeferredValueSets", myDeferredValueSets.size())
.append("myDeferredConceptMaps", myDeferredConceptMaps.size())
.append("myConceptLinksToSaveLater", myConceptLinksToSaveLater.size())
.append("myJobExecutions", myJobExecutions.size())
.append("myProcessDeferred", myProcessDeferred)
.toString();
}
}

View File

@ -44,6 +44,7 @@ public class JobInstanceUtil {
retVal.setJobDefinitionVersion(theEntity.getDefinitionVersion());
retVal.setStatus(theEntity.getStatus());
retVal.setCancelled(theEntity.isCancelled());
retVal.setFastTracking(theEntity.isFastTracking());
retVal.setStartTime(theEntity.getStartTime());
retVal.setCreateTime(theEntity.getCreateTime());
retVal.setEndTime(theEntity.getEndTime());

View File

@ -321,6 +321,7 @@ class JpaJobPersistenceImplTest {
entity.setEndTime(new Date(2000, 2, 3));
entity.setStatus(StatusEnum.COMPLETED);
entity.setCancelled(true);
entity.setFastTracking(true);
entity.setCombinedRecordsProcessed(12);
entity.setCombinedRecordsProcessedPerSecond(2d);
entity.setTotalElapsedMillis(1000);

View File

@ -22,9 +22,6 @@ import ca.uhn.fhir.mdm.model.MdmPidTuple;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.HasOrListParam;
import ca.uhn.fhir.rest.param.HasParam;
import ch.qos.logback.classic.spi.ILoggingEvent;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Group;
@ -34,7 +31,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatcher;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
@ -52,7 +48,6 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import static ca.uhn.fhir.rest.api.Constants.PARAM_HAS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -62,7 +57,6 @@ import static org.mockito.AdditionalMatchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

View File

@ -7,7 +7,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -29,7 +29,6 @@ import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.test.concurrency.LatchTimedOutError;
import ca.uhn.test.concurrency.PointcutLatch;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.junit.jupiter.api.AfterEach;
@ -50,13 +49,10 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import static ca.uhn.fhir.batch2.config.BaseBatch2Config.CHANNEL_NAME;
import static ca.uhn.fhir.batch2.coordinator.StepExecutionSvc.MAX_CHUNK_ERROR_COUNT;
import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -193,7 +189,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.awaitSingleChunkJobCompletion(startResponse.getJobId());
myBatch2JobHelper.awaitJobCompletion(startResponse.getJobId());
}
@Test
@ -205,88 +201,22 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
};
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> callLatch(myLastStepLatch, step);
String jobId = "test-job-2";
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobId, firstStep, lastStep);
String jobDefId = "test-job-2";
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobDefId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
JobInstanceStartRequest request = buildRequest(jobId);
JobInstanceStartRequest request = buildRequest(jobDefId);
myFirstStepLatch.setExpectedCount(1);
myLastStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
String batchJobId = myJobCoordinator.startInstance(request).getJobId();
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.assertNoGatedStep(startResponse.getJobId());
myBatch2JobHelper.assertFastTracking(batchJobId);
// Since there was only one chunk, the job should proceed without requiring a maintenance pass
myBatch2JobHelper.awaitSingleChunkJobCompletion(startResponse.getJobId());
myLastStepLatch.awaitExpected();
}
@Test
public void testFastTrack_Maintenance_do_not_both_call_CompletionHandler() throws InterruptedException {
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> {
sink.accept(new FirstStepOutput());
callLatch(myFirstStepLatch, step);
return RunOutcome.SUCCESS;
};
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> callLatch(myLastStepLatch, step);
String jobId = "test-job-2a";
String completionHandlerLatchName = "Completion Handler";
PointcutLatch calledLatch = new PointcutLatch(completionHandlerLatchName);
CountDownLatch waitLatch = new CountDownLatch(2);
myCompletionHandler = details -> {
try {
calledLatch.call(details);
waitLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
JobDefinition<? extends IModelJson> definition = buildGatedJobDefinition(jobId, firstStep, lastStep);
myJobDefinitionRegistry.addJobDefinition(definition);
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
myLastStepLatch.setExpectedCount(1);
calledLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(request);
String instanceId = startResponse.getJobId();
myFirstStepLatch.awaitExpected();
calledLatch.awaitExpected();
myBatch2JobHelper.assertNoGatedStep(instanceId);
// Start a maintenance run in the background
ExecutorService executor = Executors.newSingleThreadExecutor();
// Now queue up the maintenance call
calledLatch.setExpectedCount(1);
executor.submit(() -> myJobMaintenanceService.runMaintenancePass());
// We should have only called the completion handler once
try {
// This test will pause for 5 seconds here. This should be more than enough time on most servers to hit the
// spot where the maintenance services calls the completion handler
calledLatch.awaitExpectedWithTimeout(5);
fail();
} catch (LatchTimedOutError e) {
assertEquals("HAPI-1483: " + completionHandlerLatchName + " PointcutLatch timed out waiting 5 seconds for latch to countdown from 1 to 0. Is 1.", e.getMessage());
}
// Now release the latches
waitLatch.countDown();
waitLatch.countDown(); // This shouldn't be necessary, but just in case
// Since there was only one chunk, the job should proceed without requiring a maintenance pass
myBatch2JobHelper.awaitSingleChunkJobCompletion(instanceId);
myBatch2JobHelper.awaitJobCompletion(batchJobId);
myLastStepLatch.awaitExpected();
}
@ -396,8 +326,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
IJobStepWorker<TestJobParameters, VoidModel, FirstStepOutput> firstStep = (step, sink) -> {
sink.accept(new FirstStepOutput());
sink.accept(new FirstStepOutput());
callLatch(myFirstStepLatch, step);
return RunOutcome.SUCCESS;
return callLatch(myFirstStepLatch, step);
};
IJobStepWorker<TestJobParameters, FirstStepOutput, VoidModel> lastStep = (step, sink) -> callLatch(myLastStepLatch, step);
@ -413,11 +342,12 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
String instanceId = startResponse.getJobId();
myFirstStepLatch.awaitExpected();
myBatch2JobHelper.awaitGatedStepId(FIRST_STEP_ID, instanceId);
myLastStepLatch.setExpectedCount(2);
myBatch2JobHelper.awaitJobCompletion(instanceId);
myLastStepLatch.awaitExpected();
// Now we've processed 2 chunks so we are no longer fast tracking
myBatch2JobHelper.assertNotFastTracking(instanceId);
}
@ -527,7 +457,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse response = myJobCoordinator.startInstance(request);
JobInstance instance = myBatch2JobHelper.awaitJobHitsStatusInTime(response.getJobId(),
JobInstance instance = myBatch2JobHelper.awaitJobHasStatus(response.getJobId(),
12, // we want to wait a long time (2 min here) cause backoff is incremental
StatusEnum.FAILED, StatusEnum.ERRORED // error states
);

View File

@ -61,9 +61,7 @@ public class JobInstanceRepositoryTest extends BaseJpaR4Test {
@Test
public void testServiceLogicIsCorrectWithStatuses() {
//Given
FetchJobInstancesRequest request = new FetchJobInstancesRequest(myJobDefinitionId, myParams);
request.addStatus(StatusEnum.IN_PROGRESS);
request.addStatus(StatusEnum.COMPLETED);
FetchJobInstancesRequest request = new FetchJobInstancesRequest(myJobDefinitionId, myParams, StatusEnum.IN_PROGRESS, StatusEnum.COMPLETED);
//When
List<JobInstance> jobInstances = myJobPersistenceSvc.fetchInstances(request, 0, 1000);

View File

@ -5,6 +5,7 @@ import ca.uhn.fhir.batch2.api.JobOperationResultJson;
import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.jobs.imprt.NdJsonFileJson;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.MarkWorkChunkAsErrorRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
@ -110,21 +111,21 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String instanceId = mySvc.storeNewInstance(instance);
runInTransaction(() -> {
Batch2JobInstanceEntity instanceEntity = myJobInstanceRepository.findById(instanceId).orElseThrow(() -> new IllegalStateException());
Batch2JobInstanceEntity instanceEntity = myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalStateException::new);
assertEquals(StatusEnum.QUEUED, instanceEntity.getStatus());
});
JobInstance foundInstance = mySvc.fetchInstanceAndMarkInProgress(instanceId).orElseThrow(() -> new IllegalStateException());
JobInstance foundInstance = mySvc.fetchInstance(instanceId).orElseThrow(IllegalStateException::new);
assertEquals(instanceId, foundInstance.getInstanceId());
assertEquals(JOB_DEFINITION_ID, foundInstance.getJobDefinitionId());
assertEquals(JOB_DEF_VER, foundInstance.getJobDefinitionVersion());
assertEquals(StatusEnum.IN_PROGRESS, foundInstance.getStatus());
assertEquals(StatusEnum.QUEUED, foundInstance.getStatus());
assertEquals(CHUNK_DATA, foundInstance.getParameters());
assertEquals(instance.getReport(), foundInstance.getReport());
runInTransaction(() -> {
Batch2JobInstanceEntity instanceEntity = myJobInstanceRepository.findById(instanceId).orElseThrow(() -> new IllegalStateException());
assertEquals(StatusEnum.IN_PROGRESS, instanceEntity.getStatus());
Batch2JobInstanceEntity instanceEntity = myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalStateException::new);
assertEquals(StatusEnum.QUEUED, instanceEntity.getStatus());
});
}
@ -134,7 +135,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String instanceId = mySvc.storeNewInstance(instance);
runInTransaction(() -> {
Batch2JobInstanceEntity instanceEntity = myJobInstanceRepository.findById(instanceId).orElseThrow(() -> new IllegalStateException());
Batch2JobInstanceEntity instanceEntity = myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalStateException::new);
assertEquals(StatusEnum.QUEUED, instanceEntity.getStatus());
instanceEntity.setCancelled(true);
myJobInstanceRepository.save(instanceEntity);
@ -144,29 +145,16 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertTrue(result.getSuccess());
assertEquals("Job instance <" + instanceId + "> successfully cancelled.", result.getMessage());
JobInstance foundInstance = mySvc.fetchInstanceAndMarkInProgress(instanceId).orElseThrow(() -> new IllegalStateException());
JobInstance foundInstance = mySvc.fetchInstance(instanceId).orElseThrow(IllegalStateException::new);
assertEquals(instanceId, foundInstance.getInstanceId());
assertEquals(JOB_DEFINITION_ID, foundInstance.getJobDefinitionId());
assertEquals(JOB_DEF_VER, foundInstance.getJobDefinitionVersion());
assertEquals(StatusEnum.IN_PROGRESS, foundInstance.getStatus());
assertEquals(StatusEnum.QUEUED, foundInstance.getStatus());
assertTrue(foundInstance.isCancelled());
assertEquals(CHUNK_DATA, foundInstance.getParameters());
}
@Test
public void testFetchInstanceAndMarkInProgress() {
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
JobInstance foundInstance = mySvc.fetchInstanceAndMarkInProgress(instanceId).orElseThrow(() -> new IllegalStateException());
assertEquals(36, foundInstance.getInstanceId().length());
assertEquals(JOB_DEFINITION_ID, foundInstance.getJobDefinitionId());
assertEquals(JOB_DEF_VER, foundInstance.getJobDefinitionVersion());
assertEquals(StatusEnum.IN_PROGRESS, foundInstance.getStatus());
assertEquals(CHUNK_DATA, foundInstance.getParameters());
}
@Test
void testFetchInstancesByJobDefinitionId() {
JobInstance instance = createInstance();
@ -205,23 +193,23 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertNull(chunks.get(0).getData());
assertNull(chunks.get(1).getData());
assertNull(chunks.get(2).getData());
assertThat(chunks.stream().map(t -> t.getId()).collect(Collectors.toList()),
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
contains(ids.get(0), ids.get(1), ids.get(2)));
chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 1);
assertThat(chunks.stream().map(t -> t.getId()).collect(Collectors.toList()),
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
contains(ids.get(3), ids.get(4), ids.get(5)));
chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 2);
assertThat(chunks.stream().map(t -> t.getId()).collect(Collectors.toList()),
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
contains(ids.get(6), ids.get(7), ids.get(8)));
chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 3);
assertThat(chunks.stream().map(t -> t.getId()).collect(Collectors.toList()),
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
contains(ids.get(9)));
chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 3, 4);
assertThat(chunks.stream().map(t -> t.getId()).collect(Collectors.toList()),
assertThat(chunks.stream().map(WorkChunk::getId).collect(Collectors.toList()),
empty());
}
@ -237,7 +225,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, null);
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(() -> new IllegalArgumentException());
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(IllegalArgumentException::new);
assertNull(chunk.getData());
}
@ -248,16 +236,16 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String id = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, CHUNK_DATA);
assertNotNull(id);
runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(() -> new IllegalArgumentException()).getStatus()));
runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(() -> new IllegalArgumentException());
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(id).orElseThrow(IllegalArgumentException::new);
assertEquals(36, chunk.getInstanceId().length());
assertEquals(JOB_DEFINITION_ID, chunk.getJobDefinitionId());
assertEquals(JOB_DEF_VER, chunk.getJobDefinitionVersion());
assertEquals(StatusEnum.IN_PROGRESS, chunk.getStatus());
assertEquals(CHUNK_DATA, chunk.getData());
runInTransaction(() -> assertEquals(StatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(id).orElseThrow(() -> new IllegalArgumentException()).getStatus()));
runInTransaction(() -> assertEquals(StatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(id).orElseThrow(IllegalArgumentException::new).getStatus()));
}
@Test
@ -267,11 +255,11 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, CHUNK_DATA);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(() -> new IllegalArgumentException()).getStatus()));
runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(() -> new IllegalArgumentException());
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(StatusEnum.IN_PROGRESS, chunk.getStatus());
assertNotNull(chunk.getCreateTime());
@ -279,13 +267,13 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertNull(chunk.getEndTime());
assertNull(chunk.getRecordsProcessed());
assertNotNull(chunk.getData());
runInTransaction(() -> assertEquals(StatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(chunkId).orElseThrow(() -> new IllegalArgumentException()).getStatus()));
runInTransaction(() -> assertEquals(StatusEnum.IN_PROGRESS, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
sleepUntilTimeChanges();
mySvc.markWorkChunkAsCompletedAndClearData(chunkId, 50);
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(() -> new IllegalArgumentException());
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(StatusEnum.COMPLETED, entity.getStatus());
assertEquals(50, entity.getRecordsProcessed());
assertNotNull(entity.getCreateTime());
@ -325,19 +313,20 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(() -> new IllegalArgumentException()).getStatus()));
runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(() -> new IllegalArgumentException());
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(StatusEnum.IN_PROGRESS, chunk.getStatus());
sleepUntilTimeChanges();
mySvc.markWorkChunkAsErroredAndIncrementErrorCount(chunkId, "This is an error message");
MarkWorkChunkAsErrorRequest request = new MarkWorkChunkAsErrorRequest().setChunkId(chunkId).setErrorMsg("This is an error message");
mySvc.markWorkChunkAsErroredAndIncrementErrorCount(request);
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(() -> new IllegalArgumentException());
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(StatusEnum.ERRORED, entity.getStatus());
assertEquals("This is an error message", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
@ -350,9 +339,10 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
// Mark errored again
mySvc.markWorkChunkAsErroredAndIncrementErrorCount(chunkId, "This is an error message 2");
MarkWorkChunkAsErrorRequest request2 = new MarkWorkChunkAsErrorRequest().setChunkId(chunkId).setErrorMsg("This is an error message 2");
mySvc.markWorkChunkAsErroredAndIncrementErrorCount(request2);
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(() -> new IllegalArgumentException());
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(StatusEnum.ERRORED, entity.getStatus());
assertEquals("This is an error message 2", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
@ -375,11 +365,11 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
String chunkId = storeWorkChunk(DEF_CHUNK_ID, STEP_CHUNK_ID, instanceId, SEQUENCE_NUMBER, null);
assertNotNull(chunkId);
runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(() -> new IllegalArgumentException()).getStatus()));
runInTransaction(() -> assertEquals(StatusEnum.QUEUED, myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new).getStatus()));
sleepUntilTimeChanges();
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(() -> new IllegalArgumentException());
WorkChunk chunk = mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(SEQUENCE_NUMBER, chunk.getSequence());
assertEquals(StatusEnum.IN_PROGRESS, chunk.getStatus());
@ -387,7 +377,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
mySvc.markWorkChunkAsFailed(chunkId, "This is an error message");
runInTransaction(() -> {
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(() -> new IllegalArgumentException());
Batch2WorkChunkEntity entity = myWorkChunkRepository.findById(chunkId).orElseThrow(IllegalArgumentException::new);
assertEquals(StatusEnum.FAILED, entity.getStatus());
assertEquals("This is an error message", entity.getErrorMessage());
assertNotNull(entity.getCreateTime());
@ -406,7 +396,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertFalse(mySvc.markInstanceAsCompleted(instanceId));
runInTransaction(() -> {
Batch2JobInstanceEntity entity = myJobInstanceRepository.findById(instanceId).orElseThrow(() -> new IllegalArgumentException());
Batch2JobInstanceEntity entity = myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalArgumentException::new);
assertEquals(StatusEnum.COMPLETED, entity.getStatus());
});
}
@ -415,7 +405,7 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
public void testUpdateInstance() {
String instanceId = mySvc.storeNewInstance(createInstance());
JobInstance instance = mySvc.fetchInstance(instanceId).orElseThrow(() -> new IllegalArgumentException());
JobInstance instance = mySvc.fetchInstance(instanceId).orElseThrow(IllegalArgumentException::new);
assertEquals(instanceId, instance.getInstanceId());
assertFalse(instance.isWorkChunksPurged());
@ -432,12 +422,12 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
mySvc.updateInstance(instance);
runInTransaction(() -> {
Batch2JobInstanceEntity entity = myJobInstanceRepository.findById(instanceId).orElseThrow(() -> new IllegalArgumentException());
Batch2JobInstanceEntity entity = myJobInstanceRepository.findById(instanceId).orElseThrow(IllegalArgumentException::new);
assertEquals(instance.getStartTime().getTime(), entity.getStartTime().getTime());
assertEquals(instance.getEndTime().getTime(), entity.getEndTime().getTime());
});
JobInstance finalInstance = mySvc.fetchInstance(instanceId).orElseThrow(() -> new IllegalArgumentException());
JobInstance finalInstance = mySvc.fetchInstance(instanceId).orElseThrow(IllegalArgumentException::new);
assertEquals(instanceId, finalInstance.getInstanceId());
assertEquals(0.5d, finalInstance.getProgress());
assertTrue(finalInstance.isWorkChunksPurged());

View File

@ -1,6 +1,5 @@
package ca.uhn.fhir.jpa.bulk.imprt.svc;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
@ -16,8 +15,6 @@ import ca.uhn.fhir.jpa.bulk.imprt.model.ActivateJobResult;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson;
import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson;
import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkImportJobDao;
import ca.uhn.fhir.jpa.dao.data.IBulkImportJobFileDao;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
@ -72,13 +69,6 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil
@Autowired
private IBulkDataImportSvc mySvc;
@Autowired
private IBulkImportJobDao myBulkImportJobDao;
@Autowired
private IBulkImportJobFileDao myBulkImportJobFileDao;
@Autowired
private IJobCoordinator myJobCoordinator;
@Autowired
private Batch2JobHelper myBatch2JobHelper;
@ -139,7 +129,7 @@ public class BulkDataImportR4Test extends BaseJpaR4Test implements ITestDataBuil
ActivateJobResult activateJobOutcome = mySvc.activateNextReadyJob();
assertTrue(activateJobOutcome.isActivated);
JobInstance instance = myBatch2JobHelper.awaitJobHitsStatusInTime(activateJobOutcome.jobId,
JobInstance instance = myBatch2JobHelper.awaitJobHasStatus(activateJobOutcome.jobId,
60,
StatusEnum.FAILED);

View File

@ -1,7 +1,10 @@
package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedComboTokenNonUnique;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.submit.interceptor.SearchParamValidatingInterceptor;
import ca.uhn.fhir.jpa.util.SqlQuery;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.DateParam;
import ca.uhn.fhir.rest.param.StringParam;
@ -14,7 +17,12 @@ import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.Enumerations.PublicationStatus;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.SearchParameter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Comparator;
import java.util.List;
@ -24,8 +32,28 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class FhirResourceDaoR4ComboNonUniqueParamTest extends BaseComboParamsR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(FhirResourceDaoR4ComboNonUniqueParamTest.class);
@Autowired
SearchParamValidatingInterceptor mySearchParamValidatingInterceptor;
@Autowired
IInterceptorService myInterceptorService;
private boolean myInterceptorFound = false;
@BeforeEach
public void removeInterceptor() {
myInterceptorFound = myInterceptorService.unregisterInterceptor(mySearchParamValidatingInterceptor);
}
@AfterEach
public void restoreInterceptor() {
if (myInterceptorFound) {
myInterceptorService.unregisterInterceptor(mySearchParamValidatingInterceptor);
}
}
private void createNamesAndGenderSp() {
SearchParameter sp = new SearchParameter();
@ -108,8 +136,14 @@ public class FhirResourceDaoR4ComboNonUniqueParamTest extends BaseComboParamsR4T
myCaptureQueriesListener.logSelectQueries();
assertThat(actual, containsInAnyOrder(id1.toUnqualifiedVersionless().getValue()));
String sql = myCaptureQueriesListener.getSelectQueries().get(0).getSql(true, false);
assertEquals("SELECT t0.RES_ID FROM HFJ_IDX_CMB_TOK_NU t0 WHERE (t0.IDX_STRING = 'Patient?family=FAMILY1%5C%7C&gender=http%3A%2F%2Fhl7.org%2Ffhir%2Fadministrative-gender%7Cmale&given=GIVEN1')", sql);
boolean found = false;
for (SqlQuery query : myCaptureQueriesListener.getSelectQueries()) {
String sql = query.getSql(true, false);
if ("SELECT t0.RES_ID FROM HFJ_IDX_CMB_TOK_NU t0 WHERE (t0.IDX_STRING = 'Patient?family=FAMILY1%5C%7C&gender=http%3A%2F%2Fhl7.org%2Ffhir%2Fadministrative-gender%7Cmale&given=GIVEN1')".equals(sql)) {
found = true;
}
}
assertTrue(found, "Found expected sql");
logCapturedMessages();
assertThat(myMessages.toString(), containsString("[INFO Using NON_UNIQUE index for query for search: Patient?family=FAMILY1%5C%7C&gender=http%3A%2F%2Fhl7.org%2Ffhir%2Fadministrative-gender%7Cmale&given=GIVEN1]"));

View File

@ -67,7 +67,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(parameters);
Batch2JobStartResponse res = myJobCoordinator.startInstance(startRequest);
myBatch2JobHelper.awaitSingleChunkJobCompletion(res);
myBatch2JobHelper.awaitJobCompletion(res);
// validate
assertEquals(2, myObservationDao.search(SearchParameterMap.newSynchronous()).size());
@ -102,7 +102,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(new ReindexJobParameters());
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(startRequest);
myBatch2JobHelper.awaitSingleChunkJobCompletion(startResponse);
myBatch2JobHelper.awaitJobCompletion(startResponse);
// validate
assertEquals(50, myObservationDao.search(SearchParameterMap.newSynchronous()).size());

View File

@ -36,7 +36,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;

View File

@ -99,7 +99,7 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(response));
String jobId = BatchHelperR4.jobIdFromBatch2Parameters(response);
myBatch2JobHelper.awaitSingleChunkJobCompletion(jobId);
myBatch2JobHelper.awaitJobCompletion(jobId);
assertThat(interceptor.requestPartitionIds, hasSize(3));
RequestPartitionId partitionId = interceptor.requestPartitionIds.get(0);
@ -154,7 +154,7 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(response));
StringType jobId = (StringType) response.getParameter(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID);
myBatch2JobHelper.awaitSingleChunkJobCompletion(jobId.getValue());
myBatch2JobHelper.awaitJobCompletion(jobId.getValue());
// validate
List<String> alleleObservationIds = reindexTestHelper.getAlleleObservationIds(myClient);
@ -178,7 +178,7 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(response));
jobId = (StringType) response.getParameter(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID);
myBatch2JobHelper.awaitSingleChunkJobCompletion(jobId.getValue());
myBatch2JobHelper.awaitJobCompletion(jobId.getValue());
myTenantClientInterceptor.setTenantId(DEFAULT_PARTITION_NAME);
@ -221,7 +221,7 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(response));
StringType jobId = (StringType) response.getParameter(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID);
myBatch2JobHelper.awaitSingleChunkJobCompletion(jobId.getValue());
myBatch2JobHelper.awaitJobCompletion(jobId.getValue());
// validate
List<String> alleleObservationIds = reindexTestHelper.getAlleleObservationIds(myClient);

View File

@ -816,6 +816,7 @@ public class GiantTransactionPerfTest {
}
private static class MockSchedulerSvc implements ISchedulerService {
@Override
public void purgeAllScheduledJobsForUnitTest() {
throw new UnsupportedOperationException();
@ -850,6 +851,16 @@ public class GiantTransactionPerfTest {
public boolean isStopping() {
return false;
}
@Override
public void triggerLocalJobImmediately(ScheduledJobDefinition theJobDefinition) {
ISchedulerService.super.triggerLocalJobImmediately(theJobDefinition);
}
@Override
public void triggerClusteredJobImmediately(ScheduledJobDefinition theJobDefinition) {
ISchedulerService.super.triggerClusteredJobImmediately(theJobDefinition);
}
}
private static class MockServletRequest extends MockHttpServletRequest {

View File

@ -1,5 +1,6 @@
package ca.uhn.fhir.jpa.stresstest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.provider.r4.BaseResourceProviderR4Test;
@ -603,7 +604,7 @@ public class StressTestR4Test extends BaseResourceProviderR4Test {
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(response));
String jobId = BatchHelperR4.jobIdFromBatch2Parameters(response);
myBatch2JobHelper.awaitJobCompletion(jobId, 60);
myBatch2JobHelper.awaitJobHasStatus(jobId, 60, StatusEnum.COMPLETED);
int deleteCount = myCaptureQueriesListener.getDeleteQueries().size();
myCaptureQueriesListener.logDeleteQueries();

View File

@ -18,7 +18,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
public class TerminologyLoaderSvcLoincJpaTest extends BaseJpaR4Test {
@Autowired private BatchJobHelper myBatchJobHelper;
@Autowired
private BatchJobHelper myBatchJobHelper;
private TermLoaderSvcImpl mySvc;
private ZipCollectionBuilder myFiles;
@ -63,7 +64,7 @@ public class TerminologyLoaderSvcLoincJpaTest extends BaseJpaR4Test {
mySvc.loadLoinc(myFiles.getFiles(), mySrd);
myTerminologyDeferredStorageSvc.saveAllDeferred();
myBatchJobHelper.awaitAllBulkJobCompletions(false, TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME );
myBatchJobHelper.awaitAllBulkJobCompletions(false, TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME);
runInTransaction(() -> {
assertEquals(1, myTermCodeSystemDao.count());
@ -89,7 +90,7 @@ public class TerminologyLoaderSvcLoincJpaTest extends BaseJpaR4Test {
TermTestUtil.addLoincMandatoryFilesWithPropertiesFileToZip(myFiles, "v268_loincupload.properties");
mySvc.loadLoinc(myFiles.getFiles(), mySrd);
myTerminologyDeferredStorageSvc.saveAllDeferred();
myBatchJobHelper.awaitAllBulkJobCompletions(false, TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME );
myBatchJobHelper.awaitAllBulkJobCompletions(false, TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME);
runInTransaction(() -> {
assertEquals(1, myTermCodeSystemDao.count());
@ -116,6 +117,14 @@ public class TerminologyLoaderSvcLoincJpaTest extends BaseJpaR4Test {
@Test
public void testLoadLoincVersionNotCurrent() throws IOException {
runInTransaction(() -> {
assertEquals(0, myTermCodeSystemDao.count());
assertEquals(0, myTermCodeSystemVersionDao.count());
assertEquals(0, myTermValueSetDao.count());
assertEquals(0, myTermConceptDao.count());
assertEquals(0, myTermConceptMapDao.count());
assertEquals(0, myResourceTableDao.count());
});
// Load LOINC marked as version 2.67
TermTestUtil.addLoincMandatoryFilesWithPropertiesFileToZip(myFiles, "v267_loincupload.properties");
@ -202,7 +211,7 @@ public class TerminologyLoaderSvcLoincJpaTest extends BaseJpaR4Test {
IBundleProvider codeSystems = myCodeSystemDao.search(SearchParameterMap.newSynchronous());
assertEquals(1, codeSystems.size());
CodeSystem codeSystem = (CodeSystem) codeSystems.getResources(0,1).get(0);
CodeSystem codeSystem = (CodeSystem) codeSystems.getResources(0, 1).get(0);
assertEquals("LOINC Code System (Testing Copy)", codeSystem.getTitle());
}

View File

@ -22,6 +22,7 @@ import org.hibernate.search.engine.search.query.SearchQuery;
import org.hibernate.search.mapper.orm.Search;
import org.hibernate.search.mapper.orm.common.EntityReference;
import org.hibernate.search.mapper.orm.session.SearchSession;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
@ -96,13 +97,11 @@ public class ReindexTerminologyHSearchR4Test extends BaseJpaR4Test {
@Autowired
private TermLoaderSvcImpl myTermLoaderSvc;
@Autowired
private ITermReadSvc myITermReadSvc;
@Autowired
private ITermConceptDao myTermConceptDao;
@Autowired
private ITermReadSvc myTermReadSvc;
@Test()
@Test
public void uploadLoincCodeSystem() throws FileNotFoundException, InterruptedException {
List<ITermLoaderSvc.FileDescriptor> myFileDescriptors = buildFileDescriptors();
@ -270,6 +269,7 @@ public class ReindexTerminologyHSearchR4Test extends BaseJpaR4Test {
}
@Override
@AfterEach
public void afterCleanupDao() {
if (CLEANUP_DATA) {
super.afterCleanupDao();
@ -277,6 +277,7 @@ public class ReindexTerminologyHSearchR4Test extends BaseJpaR4Test {
}
@Override
@AfterEach
public void afterResetInterceptors() {
if (CLEANUP_DATA) {
super.afterResetInterceptors();
@ -284,6 +285,7 @@ public class ReindexTerminologyHSearchR4Test extends BaseJpaR4Test {
}
@Override
@AfterEach
public void afterClearTerminologyCaches() {
if (CLEANUP_DATA) {
super.afterClearTerminologyCaches();
@ -291,6 +293,7 @@ public class ReindexTerminologyHSearchR4Test extends BaseJpaR4Test {
}
@Override
@AfterEach
public void afterPurgeDatabase() {
if (CLEANUP_DATA) {
super.afterPurgeDatabase();
@ -298,6 +301,7 @@ public class ReindexTerminologyHSearchR4Test extends BaseJpaR4Test {
}
@Override
@AfterEach
public void afterEachClearCaches() {
if (CLEANUP_DATA) {
super.afterEachClearCaches();

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -25,58 +25,67 @@ import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.hamcrest.Matchers;
import org.springframework.beans.factory.annotation.Autowired;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.thymeleaf.util.ArrayUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class Batch2JobHelper {
private static final Logger ourLog = LoggerFactory.getLogger(Batch2JobHelper.class);
private static final int BATCH_SIZE = 100;
@Autowired
private IJobMaintenanceService myJobMaintenanceService;
private final IJobMaintenanceService myJobMaintenanceService;
private final IJobCoordinator myJobCoordinator;
private final IJobPersistence myJobPersistence;
@Autowired
private IJobPersistence myJobPersistence;
@Autowired
private IJobCoordinator myJobCoordinator;
public Batch2JobHelper(IJobMaintenanceService theJobMaintenanceService, IJobCoordinator theJobCoordinator, IJobPersistence theJobPersistence) {
myJobMaintenanceService = theJobMaintenanceService;
myJobCoordinator = theJobCoordinator;
myJobPersistence = theJobPersistence;
}
public JobInstance awaitJobCompletion(Batch2JobStartResponse theStartResponse) {
return awaitJobCompletion(theStartResponse.getJobId());
}
public JobInstance awaitJobCompletion(String theId) {
return awaitJobCompletion(theId, 10);
return awaitJobHasStatus(theId, StatusEnum.COMPLETED);
}
public JobInstance awaitJobCancelled(String theId) {
return awaitJobHasStatus(theId, StatusEnum.CANCELLED);
}
public JobInstance awaitJobCompletion(String theId, int theSecondsToWait) {
return awaitJobHasStatus(theId, theSecondsToWait, StatusEnum.COMPLETED);
}
public JobInstance awaitJobHasStatus(String theId, StatusEnum... theExpectedStatus) {
return awaitJobHasStatus(theId, 10, theExpectedStatus);
}
public JobInstance awaitJobHasStatus(String theId, int theSecondsToWait, StatusEnum... theExpectedStatus) {
assert !TransactionSynchronizationManager.isActualTransactionActive();
try {
await()
.atMost(theSecondsToWait, TimeUnit.SECONDS)
.until(() -> {
myJobMaintenanceService.runMaintenancePass();
return myJobCoordinator.getInstance(theId).getStatus();
}, equalTo(StatusEnum.COMPLETED));
.atMost(theSecondsToWait, TimeUnit.SECONDS)
.until(() -> checkStatusWithMaintenancePass(theId, theExpectedStatus));
} catch (ConditionTimeoutException e) {
String statuses = myJobPersistence.fetchInstances(100, 0)
.stream()
@ -88,53 +97,40 @@ public class Batch2JobHelper {
return myJobCoordinator.getInstance(theId);
}
public void awaitSingleChunkJobCompletion(Batch2JobStartResponse theStartResponse) {
awaitSingleChunkJobCompletion(theStartResponse.getJobId());
private boolean checkStatusWithMaintenancePass(String theId, StatusEnum... theExpectedStatuses) {
if (hasStatus(theId, theExpectedStatuses)) {
return true;
}
myJobMaintenanceService.runMaintenancePass();
return hasStatus(theId, theExpectedStatuses);
}
public void awaitSingleChunkJobCompletion(String theId) {
await().until(() -> myJobCoordinator.getInstance(theId).getStatus() == StatusEnum.COMPLETED);
private boolean hasStatus(String theId, StatusEnum[] theExpectedStatuses) {
return ArrayUtils.contains(theExpectedStatuses, getStatus(theId));
}
private StatusEnum getStatus(String theId) {
return myJobCoordinator.getInstance(theId).getStatus();
}
public JobInstance awaitJobFailure(Batch2JobStartResponse theStartResponse) {
return awaitJobFailure(theStartResponse.getJobId());
}
public JobInstance awaitJobFailure(String theId) {
await().until(() -> {
myJobMaintenanceService.runMaintenancePass();
return myJobCoordinator.getInstance(theId).getStatus();
}, Matchers.anyOf(equalTo(StatusEnum.ERRORED), equalTo(StatusEnum.FAILED)));
return myJobCoordinator.getInstance(theId);
}
public void awaitJobCancelled(String theId) {
await().until(() -> {
myJobMaintenanceService.runMaintenancePass();
return myJobCoordinator.getInstance(theId).getStatus();
}, equalTo(StatusEnum.CANCELLED));
}
public JobInstance awaitJobHitsStatusInTime(String theId, int theSeconds, StatusEnum... theStatuses) {
await().atMost(theSeconds, TimeUnit.SECONDS)
.pollDelay(Duration.ofSeconds(10))
.until(() -> {
myJobMaintenanceService.runMaintenancePass();
return myJobCoordinator.getInstance(theId).getStatus();
}, Matchers.in(theStatuses));
return myJobCoordinator.getInstance(theId);
public JobInstance awaitJobFailure(String theJobId) {
return awaitJobHasStatus(theJobId, StatusEnum.ERRORED, StatusEnum.FAILED);
}
public void awaitJobInProgress(String theId) {
await().until(() -> {
myJobMaintenanceService.runMaintenancePass();
return myJobCoordinator.getInstance(theId).getStatus();
}, equalTo(StatusEnum.IN_PROGRESS));
await().until(() -> checkStatusWithMaintenancePass(theId, StatusEnum.IN_PROGRESS));
}
public void assertNoGatedStep(String theInstanceId) {
assertNull(myJobCoordinator.getInstance(theInstanceId).getCurrentGatedStepId());
public void assertNotFastTracking(String theInstanceId) {
assertFalse(myJobCoordinator.getInstance(theInstanceId).isFastTracking());
}
public void assertFastTracking(String theInstanceId) {
assertTrue(myJobCoordinator.getInstance(theInstanceId).isFastTracking());
}
public void awaitGatedStepId(String theExpectedGatedStepId, String theInstanceId) {
@ -182,4 +178,40 @@ public class Batch2JobHelper {
public List<JobInstance> findJobsByDefinition(String theJobDefinitionId) {
return myJobCoordinator.getInstancesbyJobDefinitionIdAndEndedStatus(theJobDefinitionId, null, 100, 0);
}
public void awaitNoJobsRunning() {
awaitNoJobsRunning(false);
}
public void awaitNoJobsRunning(boolean theExpectAtLeastOneJobToExist) {
HashMap<String, String> map = new HashMap<>();
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.until(() -> {
myJobMaintenanceService.runMaintenancePass();
List<JobInstance> jobs = myJobCoordinator.getInstances(1000, 1);
// "All Jobs" assumes at least one job exists
if (theExpectAtLeastOneJobToExist && jobs.isEmpty()) {
ourLog.warn("No jobs found yet...");
return false;
}
for (JobInstance job : jobs) {
if (job.getStatus() != StatusEnum.COMPLETED) {
map.put(job.getInstanceId(), job.getStatus().name());
} else {
map.remove(job.getInstanceId());
}
}
return map.isEmpty();
});
String msg = map.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).collect(Collectors.joining(", \n "));
ourLog.info("The following jobs did not complete as expected: {}", msg);
}
public void runMaintenancePass() {
myJobMaintenanceService.runMaintenancePass();
}
}

View File

@ -20,6 +20,9 @@ package ca.uhn.fhir.jpa.test.config;
* #L%
*/
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.binary.api.IBinaryStorageSvc;
import ca.uhn.fhir.jpa.binstore.MemoryBinaryStorageSvcImpl;
@ -104,8 +107,8 @@ public class TestJPAConfig {
}
@Bean
public Batch2JobHelper batch2JobHelper() {
return new Batch2JobHelper();
public Batch2JobHelper batch2JobHelper(IJobMaintenanceService theJobMaintenanceService, IJobCoordinator theJobCoordinator, IJobPersistence theJobPersistence) {
return new Batch2JobHelper(theJobMaintenanceService, theJobCoordinator, theJobPersistence);
}
@Bean

View File

@ -3,6 +3,7 @@ package ca.uhn.fhir.jpa.sched;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.util.StopWatch;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -25,6 +26,8 @@ import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.util.AopTestUtils;
import java.util.concurrent.atomic.AtomicInteger;
import static ca.uhn.fhir.util.TestUtil.sleepAtLeast;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
@ -33,12 +36,14 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.jupiter.api.Assertions.fail;
@ContextConfiguration(classes = SchedulerServiceImplTest.TestConfiguration.class)
@ContextConfiguration(classes = SchedulerServiceImplIT.TestConfiguration.class)
@ExtendWith(SpringExtension.class)
@DirtiesContext
public class SchedulerServiceImplTest {
public class SchedulerServiceImplIT {
private static final Logger ourLog = LoggerFactory.getLogger(SchedulerServiceImplTest.class);
private static final Logger ourLog = LoggerFactory.getLogger(SchedulerServiceImplIT.class);
public static final String SCHEDULED_JOB_ID = CountingJob.class.getName();
private static final AtomicInteger ourNameCounter = new AtomicInteger();
private static long ourTaskDelay;
@Autowired
private ISchedulerService mySvc;
@ -51,60 +56,97 @@ public class SchedulerServiceImplTest {
@Test
public void testScheduleTask() {
ScheduledJobDefinition def = new ScheduledJobDefinition()
.setId(CountingJob.class.getName())
.setJobClass(CountingJob.class);
ScheduledJobDefinition def = buildJobDefinition();
StopWatch sw = new StopWatch();
mySvc.scheduleLocalJob(100, def);
sleepAtLeast(1000);
await().until(CountingJob.ourCount::get, greaterThan(5));
ourLog.info("Fired {} times", CountingJob.ourCount);
ourLog.info("Fired {} times in {}", CountingJob.ourCount, sw);
assertThat(sw.getMillis(), greaterThan(500L));
assertThat(sw.getMillis(), lessThan(1000L));
}
assertThat(CountingJob.ourCount, greaterThan(3));
assertThat(CountingJob.ourCount, lessThan(20));
@Test
public void triggerImmediately_runsJob() {
ScheduledJobDefinition def = buildJobDefinition();
StopWatch sw = new StopWatch();
mySvc.scheduleLocalJob(100, def);
for (int i = 0; i < 20; ++i) {
mySvc.triggerLocalJobImmediately(def);
}
await().until(CountingJob.ourCount::get, greaterThan(25));
ourLog.info("Fired {} times in {}", CountingJob.ourCount, sw);
assertThat(sw.getMillis(), greaterThan(500L));
assertThat(sw.getMillis(), lessThan(1000L));
}
private static ScheduledJobDefinition buildJobDefinition() {
return new ScheduledJobDefinition()
.setId(SCHEDULED_JOB_ID + ourNameCounter.incrementAndGet())
.setJobClass(CountingJob.class);
}
@Test
public void testStopAndStartService() throws SchedulerException {
ScheduledJobDefinition def = new ScheduledJobDefinition()
.setId(CountingJob.class.getName())
.setJobClass(CountingJob.class);
ScheduledJobDefinition def = buildJobDefinition();
BaseSchedulerServiceImpl svc = AopTestUtils.getTargetObject(mySvc);
svc.stop();
svc.create();
svc.start();
StopWatch sw = new StopWatch();
mySvc.scheduleLocalJob(100, def);
sleepAtLeast(1000);
await().until(CountingJob.ourCount::get, greaterThan(5));
ourLog.info("Fired {} times", CountingJob.ourCount);
await().until(() -> CountingJob.ourCount, greaterThan(3));
assertThat(CountingJob.ourCount, lessThan(50));
ourLog.info("Fired {} times in {}", CountingJob.ourCount, sw);
assertThat(sw.getMillis(), greaterThan(0L));
assertThat(sw.getMillis(), lessThan(1000L));
}
@Test
public void testScheduleTaskLongRunningDoesntRunConcurrently() {
ScheduledJobDefinition def = new ScheduledJobDefinition()
.setId(CountingJob.class.getName())
.setJobClass(CountingJob.class);
ScheduledJobDefinition def = buildJobDefinition();
ourTaskDelay = 500;
StopWatch sw = new StopWatch();
mySvc.scheduleLocalJob(100, def);
sleepAtLeast(1000);
await().until(CountingJob.ourCount::get, greaterThan(5));
ourLog.info("Fired {} times", CountingJob.ourCount);
await().until(() -> CountingJob.ourCount, greaterThanOrEqualTo(1));
assertThat(CountingJob.ourCount, lessThan(5));
ourLog.info("Fired {} times in {}", CountingJob.ourCount, sw);
assertThat(sw.getMillis(), greaterThan(3000L));
assertThat(sw.getMillis(), lessThan(3500L));
}
@Test
public void testScheduleTaskLongRunningDoesntRunConcurrentlyWithTrigger() {
ScheduledJobDefinition def = buildJobDefinition();
ourTaskDelay = 500;
StopWatch sw = new StopWatch();
mySvc.scheduleLocalJob(100, def);
mySvc.triggerLocalJobImmediately(def);
mySvc.triggerLocalJobImmediately(def);
await().until(CountingJob.ourCount::get, greaterThan(5));
ourLog.info("Fired {} times in {}", CountingJob.ourCount, sw);
assertThat(sw.getMillis(), greaterThan(3000L));
assertThat(sw.getMillis(), lessThan(3500L));
}
@Test
public void testIntervalJob() {
@ -125,7 +167,7 @@ public class SchedulerServiceImplTest {
@AfterEach
public void after() throws SchedulerException {
CountingJob.ourCount = 0;
CountingJob.resetCount();
CountingIntervalJob.ourCount = 0;
mySvc.purgeAllScheduledJobsForUnitTest();
}
@ -133,15 +175,24 @@ public class SchedulerServiceImplTest {
@DisallowConcurrentExecution
public static class CountingJob implements Job, ApplicationContextAware {
private static int ourCount;
private static AtomicInteger ourCount = new AtomicInteger();
private static boolean ourRunning = false;
@Autowired
@Qualifier("stringBean")
private String myStringBean;
private ApplicationContext myAppCtx;
public static void resetCount() {
ourCount = new AtomicInteger();
}
@Override
public void execute(JobExecutionContext theContext) {
if (ourRunning) {
fail();
}
ourRunning = true;
if (!"String beans are good.".equals(myStringBean)) {
fail("Did not autowire stringBean correctly, found: " + myStringBean);
}
@ -155,7 +206,8 @@ public class SchedulerServiceImplTest {
} else {
ourLog.info("Job has fired...");
}
ourCount++;
ourCount.incrementAndGet();
ourRunning = false;
}
@Override

View File

@ -0,0 +1,57 @@
package ca.uhn.fhir.jpa.test;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class Batch2JobHelperTest {
private static final String JOB_ID = "Batch2JobHelperTest";
@Mock
IJobMaintenanceService myJobMaintenanceService;
@Mock
IJobCoordinator myJobCoordinator;
@InjectMocks
Batch2JobHelper myBatch2JobHelper;
static JobInstance ourIncompleteInstance = new JobInstance().setStatus(StatusEnum.IN_PROGRESS);
static JobInstance ourCompleteInstance = new JobInstance().setStatus(StatusEnum.COMPLETED);
@AfterEach
void after() {
verifyNoMoreInteractions(myJobCoordinator);
verifyNoMoreInteractions(myJobMaintenanceService);
}
@Test
void awaitJobCompletion_inProgress_callsMaintenance() {
when(myJobCoordinator.getInstance(JOB_ID)).thenReturn(ourIncompleteInstance, ourCompleteInstance);
myBatch2JobHelper.awaitJobCompletion(JOB_ID);
verify(myJobMaintenanceService, times(1)).runMaintenancePass();
}
@Test
void awaitJobCompletion_alreadyComplete_doesNotCallMaintenance() {
when(myJobCoordinator.getInstance(JOB_ID)).thenReturn(ourCompleteInstance);
myBatch2JobHelper.awaitJobCompletion(JOB_ID);
verifyNoInteractions(myJobMaintenanceService);
}
}

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -7,7 +7,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -487,7 +487,7 @@ public class RestfulServer extends HttpServlet implements IRestfulServer<Servlet
if (Modifier.isStatic(m.getModifiers())) {
throw new ConfigurationException(Msg.code(291) + "Method '" + m.getName() + "' is static, FHIR RESTful methods must not be static");
}
ourLog.debug("Scanning public method: {}#{}", theProvider.getClass(), m.getName());
ourLog.trace("Scanning public method: {}#{}", theProvider.getClass(), m.getName());
String resourceName = foundMethodBinding.getResourceName();
ResourceBinding resourceBinding;
@ -523,7 +523,7 @@ public class RestfulServer extends HttpServlet implements IRestfulServer<Servlet
}
resourceBinding.addMethod(foundMethodBinding);
ourLog.debug(" * Method: {}#{} is a handler", theProvider.getClass(), m.getName());
ourLog.trace(" * Method: {}#{} is a handler", theProvider.getClass(), m.getName());
}

View File

@ -54,7 +54,6 @@ import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@ -270,7 +269,7 @@ public class RestfulServerConfiguration implements ISearchParamRegistry {
}
String name = createNamedQueryName(methodBinding);
ourLog.debug("Detected named query: {}", name);
ourLog.trace("Detected named query: {}", name);
namedSearchMethodBindingToName.put(methodBinding, name);
if (!searchNameToBindings.containsKey(name)) {

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-client-apache</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-client-okhttp</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-server-jersey</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -109,7 +109,7 @@ public class ExpandResourcesStep implements IJobStepWorker<BulkExportJobParamete
private IBundleProvider fetchAllResources(BulkExportIdList theIds) {
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(theIds.getResourceType());
SearchParameterMap map = new SearchParameterMap();
SearchParameterMap map = SearchParameterMap.newSynchronous();
TokenOrListParam ids = new TokenOrListParam();
for (Id id : theIds.getIds()) {
ids.addOr(new TokenParam(id.toPID().getAssociatedResourceId().getValue()));

View File

@ -72,6 +72,7 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobPa
// filters are the filters for searching
Iterator<ResourcePersistentId> pidIterator = myBulkExportProcessor.getResourcePidIterator(providerParams);
List<Id> idsToSubmit = new ArrayList<>();
while (pidIterator.hasNext()) {
ResourcePersistentId pid = pidIterator.next();
@ -90,11 +91,10 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobPa
if (!idsToSubmit.isEmpty()) {
submitWorkChunk(idsToSubmit, resourceType, params, theDataSink);
submissionCount++;
idsToSubmit = new ArrayList<>();
}
}
} catch (Exception ex) {
ourLog.error(ex.getMessage());
ourLog.error(ex.getMessage(), ex);
theDataSink.recoveredError(ex.getMessage());

View File

@ -55,7 +55,7 @@ public class BulkImportPullConfig {
.setParametersType(Batch2BulkImportPullJobParameters.class)
.setParametersValidator(importParameterValidator())
.addFirstStep(
"ReadInResourcesStep",
"FetchPartitionedFilesStep",
"Reads an import file and extracts the resources",
BulkImportFilePartitionResult.class,
fetchPartitionedFilesStep()
@ -67,7 +67,7 @@ public class BulkImportPullConfig {
readInResourcesFromFileStep()
)
.addLastStep(
"WriteBundleStep",
"WriteBundleForImportStep",
"Parses the bundle from previous step and writes it to the dv",
writeBundleForImportStep()
)

View File

@ -68,7 +68,7 @@ public class FetchPartitionedFilesStep implements IFirstJobStepWorker<Batch2Bulk
theDataSink.accept(result);
}
ourLog.info("FetchPartitionedFilesStep complete for jobID {}", jobId);
ourLog.info("FetchPartitionedFilesStep complete for jobID {}. Submitted {} files to next step.", jobId, job.getFileCount());
return RunOutcome.SUCCESS;
}

View File

@ -241,6 +241,13 @@ public class BulkDataImportProvider {
String msg = "Job is in " + status.getStatus() + " state with " +
status.getErrorCount() + " error count. Last error: " + status.getErrorMessage();
streamOperationOutcomeResponse(response, msg, "error");
break;
}
case CANCELLED: {
response.setStatus(Constants.STATUS_HTTP_404_NOT_FOUND);
String msg = "Job was cancelled.";
streamOperationOutcomeResponse(response, msg, "information");
break;
}
}
}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.2.0-PRE2-SNAPSHOT</version>
<version>6.2.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -61,4 +61,11 @@ public interface IJobInstance {
boolean isCancelled();
String getReport();
/**
* @return true if every step of the job has produced exactly 1 chunk.
*/
boolean isFastTracking();
void setFastTracking(boolean theFastTracking);
}

View File

@ -21,7 +21,12 @@ package ca.uhn.fhir.batch2.api;
*/
public interface IJobMaintenanceService {
/**
* Do not wait for the next scheduled time for maintenance. Trigger it immediately.
* @return true if a request to run a maintenance pass was fired, false if there was already a trigger request in queue so we can just use that one
*/
boolean triggerMaintenancePass();
void runMaintenancePass();
void runMaintenancePass();
}

View File

@ -23,10 +23,10 @@ package ca.uhn.fhir.batch2.api;
import ca.uhn.fhir.batch2.coordinator.BatchWorkChunk;
import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.batch2.model.MarkWorkChunkAsErrorRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import org.springframework.data.domain.Page;
import java.util.Iterator;
@ -86,14 +86,6 @@ public interface IJobPersistence {
*/
List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex);
/**
* Fetch a given instance and update the stored status
* * to {@link ca.uhn.fhir.batch2.model.StatusEnum#IN_PROGRESS}
*
* @param theInstanceId The ID
*/
Optional<JobInstance> fetchInstanceAndMarkInProgress(String theInstanceId);
List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex);
/**

View File

@ -43,6 +43,10 @@ public class ReductionStepExecutionDetails<PT extends IModelJson, IT extends IMo
super(theParameters, theData, theInstance, "VOID");
}
public ReductionStepExecutionDetails(@Nonnull PT theParameters, @Nonnull JobInstance theInstance) {
this(theParameters, null, theInstance);
}
@Override
@Nonnull
public final IT getData() {

View File

@ -26,7 +26,7 @@ import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.JobCoordinatorImpl;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.coordinator.StepExecutionSvc;
import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
@ -55,8 +55,8 @@ public abstract class BaseBatch2Config {
}
@Bean
public StepExecutionSvc jobStepExecutorService(BatchJobSender theBatchJobSender) {
return new StepExecutionSvc(myPersistence, theBatchJobSender);
public WorkChunkProcessor jobStepExecutorService(BatchJobSender theBatchJobSender) {
return new WorkChunkProcessor(myPersistence, theBatchJobSender);
}
@Bean
@ -67,21 +67,22 @@ public abstract class BaseBatch2Config {
@Bean
public IJobCoordinator batch2JobCoordinator(JobDefinitionRegistry theJobDefinitionRegistry,
BatchJobSender theBatchJobSender,
StepExecutionSvc theExecutor) {
WorkChunkProcessor theExecutor,
IJobMaintenanceService theJobMaintenanceService) {
return new JobCoordinatorImpl(
theBatchJobSender,
batch2ProcessingChannelReceiver(myChannelFactory),
myPersistence,
theJobDefinitionRegistry,
theExecutor
);
theExecutor,
theJobMaintenanceService);
}
@Bean
public IJobMaintenanceService batch2JobMaintenanceService(ISchedulerService theSchedulerService,
JobDefinitionRegistry theJobDefinitionRegistry,
BatchJobSender theBatchJobSender,
StepExecutionSvc theExecutor
WorkChunkProcessor theExecutor
) {
return new JobMaintenanceServiceImpl(theSchedulerService,
myPersistence,

View File

@ -22,8 +22,8 @@ package ca.uhn.fhir.batch2.config;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.jpa.batch.log.Logs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
@ -31,7 +31,7 @@ import javax.annotation.PostConstruct;
import java.util.Map;
public class Batch2JobRegisterer {
private static final Logger ourLog = LoggerFactory.getLogger(Batch2JobRegisterer.class);
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@Autowired
private ApplicationContext myApplicationContext;

View File

@ -23,12 +23,12 @@ package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.jpa.batch.log.Logs;
import ca.uhn.fhir.model.api.IModelJson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class BaseDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> implements IJobDataSink<OT> {
private static final Logger ourLog = LoggerFactory.getLogger(BaseDataSink.class);
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
private final String myInstanceId;
private final JobWorkCursor<PT,IT,OT> myJobWorkCursor;

Some files were not shown because too many files have changed in this diff Show More