diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/4187-add-jobregistry-remove.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/4187-add-jobregistry-remove.yaml new file mode 100644 index 00000000000..587b6977623 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/4187-add-jobregistry-remove.yaml @@ -0,0 +1,5 @@ +--- +type: add +issue: 4187 +title: "A remove method has been added to the Batch2 job registry. This will allow for dynamic job registration + in the future." diff --git a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestDstu2Config.java b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestDstu2Config.java index c763321ca51..3b929394a1e 100644 --- a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestDstu2Config.java +++ b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestDstu2Config.java @@ -106,8 +106,7 @@ public class TestDstu2Config { } retVal.setUsername(myDbUsername); retVal.setPassword(myDbPassword); - retVal.setDefaultQueryTimeout(20); - retVal.setTestOnBorrow(true); + TestR5Config.applyCommonDatasourceParams(retVal); DataSource dataSource = ProxyDataSourceBuilder .create(retVal) diff --git a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestDstu3Config.java b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestDstu3Config.java index a99a89ea99a..57ce001ba9f 100644 --- a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestDstu3Config.java +++ b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestDstu3Config.java @@ -115,8 +115,7 @@ public class TestDstu3Config { } retVal.setUsername(myDbUsername); retVal.setPassword(myDbPassword); - retVal.setDefaultQueryTimeout(20); - retVal.setTestOnBorrow(true); + TestR5Config.applyCommonDatasourceParams(retVal); DataSource dataSource = ProxyDataSourceBuilder .create(retVal) diff --git a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestR4BConfig.java b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestR4BConfig.java index 23a75c9b9d2..46049347eca 100644 --- a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestR4BConfig.java +++ b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestR4BConfig.java @@ -102,8 +102,7 @@ public class TestR4BConfig { } retVal.setUsername(myDbUsername); retVal.setPassword(myDbPassword); - retVal.setDefaultQueryTimeout(20); - retVal.setTestOnBorrow(true); + TestR5Config.applyCommonDatasourceParams(retVal); DataSource dataSource = ProxyDataSourceBuilder .create(retVal) diff --git a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestR4Config.java b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestR4Config.java index aa576e8ffce..48821cd3ecd 100644 --- a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestR4Config.java +++ b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestR4Config.java @@ -102,8 +102,7 @@ public class TestR4Config { } retVal.setUsername(myDbUsername); retVal.setPassword(myDbPassword); - retVal.setDefaultQueryTimeout(20); - retVal.setTestOnBorrow(true); + TestR5Config.applyCommonDatasourceParams(retVal); DataSource dataSource = ProxyDataSourceBuilder .create(retVal) diff --git a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestR5Config.java b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestR5Config.java index 7ebbc908618..6dd2d4edc31 100644 --- a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestR5Config.java +++ b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestR5Config.java @@ -107,8 +107,7 @@ public class TestR5Config { } retVal.setUsername(myDbUsername); retVal.setPassword(myDbPassword); - retVal.setDefaultQueryTimeout(20); - retVal.setTestOnBorrow(true); + applyCommonDatasourceParams(retVal); DataSource dataSource = ProxyDataSourceBuilder .create(retVal) @@ -121,6 +120,13 @@ public class TestR5Config { return dataSource; } + static void applyCommonDatasourceParams(BasicDataSource retVal) { + retVal.setDefaultQueryTimeout(20); + retVal.setTestOnBorrow(true); + retVal.setMaxTotal(50); + retVal.setMaxWaitMillis(25000); + } + // TODO KHS there is code duplication between this and the other Test*Config classes in this directory @Bean public DatabaseBackedPagingProvider databaseBackedPagingProvider() { diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDefinitionRegistry.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDefinitionRegistry.java index f25a680f08c..d43b3576d7b 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDefinitionRegistry.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDefinitionRegistry.java @@ -28,14 +28,17 @@ import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.jpa.batch.log.Logs; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; +import com.google.common.collect.ImmutableSortedMap; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import javax.annotation.Nonnull; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Optional; import java.util.Set; import java.util.TreeMap; @@ -44,15 +47,15 @@ import java.util.stream.Collectors; public class JobDefinitionRegistry { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); - private final Map>> myJobs = new HashMap<>(); + private volatile Map>> myJobs = new HashMap<>(); /** * Add a job definition only if it is not registered - * @param theDefinition - * @return true if it did not already exist and was registered + * * @param the job parameter type for the definition + * @return true if it did not already exist and was registered */ - public boolean addJobDefinitionIfNotRegistered(@Nonnull JobDefinition theDefinition) { + public synchronized boolean addJobDefinitionIfNotRegistered(@Nonnull JobDefinition theDefinition) { Optional> orig = getJobDefinition(theDefinition.getJobDefinitionId(), theDefinition.getJobDefinitionVersion()); if (orig.isPresent()) { return false; @@ -61,7 +64,7 @@ public class JobDefinitionRegistry { return true; } - public void addJobDefinition(@Nonnull JobDefinition theDefinition) { + public synchronized void addJobDefinition(@Nonnull JobDefinition theDefinition) { Validate.notNull(theDefinition); String jobDefinitionId = theDefinition.getJobDefinitionId(); Validate.notBlank(jobDefinitionId); @@ -75,7 +78,8 @@ public class JobDefinitionRegistry { } } - TreeMap> versionMap = myJobs.computeIfAbsent(jobDefinitionId, t -> new TreeMap<>()); + Map>> newJobsMap = cloneJobsMap(); + NavigableMap> versionMap = newJobsMap.computeIfAbsent(jobDefinitionId, t -> new TreeMap<>()); if (versionMap.containsKey(theDefinition.getJobDefinitionVersion())) { if (versionMap.get(theDefinition.getJobDefinitionVersion()) == theDefinition) { ourLog.warn("job[{}] version: {} already registered. Not registering again.", jobDefinitionId, theDefinition.getJobDefinitionVersion()); @@ -84,10 +88,37 @@ public class JobDefinitionRegistry { throw new ConfigurationException(Msg.code(2047) + "Multiple definitions for job[" + jobDefinitionId + "] version: " + theDefinition.getJobDefinitionVersion()); } versionMap.put(theDefinition.getJobDefinitionVersion(), theDefinition); + + myJobs = newJobsMap; + } + + public synchronized void removeJobDefinition(@Nonnull String theDefinitionId, int theVersion) { + Validate.notBlank(theDefinitionId); + Validate.isTrue(theVersion >= 1); + + Map>> newJobsMap = cloneJobsMap(); + NavigableMap> versionMap = newJobsMap.get(theDefinitionId); + if (versionMap != null) { + versionMap.remove(theVersion); + if (versionMap.isEmpty()) { + newJobsMap.remove(theDefinitionId); + } + } + + myJobs = newJobsMap; + } + + @Nonnull + private Map>> cloneJobsMap() { + Map>> newJobsMap = new HashMap<>(); + for (Map.Entry>> nextEntry : myJobs.entrySet()) { + newJobsMap.put(nextEntry.getKey(), new TreeMap<>(nextEntry.getValue())); + } + return newJobsMap; } public Optional> getLatestJobDefinition(@Nonnull String theJobDefinitionId) { - TreeMap> versionMap = myJobs.get(theJobDefinitionId); + NavigableMap> versionMap = myJobs.get(theJobDefinitionId); if (versionMap == null || versionMap.isEmpty()) { return Optional.empty(); } @@ -95,7 +126,7 @@ public class JobDefinitionRegistry { } public Optional> getJobDefinition(@Nonnull String theJobDefinitionId, int theJobDefinitionVersion) { - TreeMap> versionMap = myJobs.get(theJobDefinitionId); + NavigableMap> versionMap = myJobs.get(theJobDefinitionId); if (versionMap == null || versionMap.isEmpty()) { return Optional.empty(); } @@ -138,4 +169,8 @@ public class JobDefinitionRegistry { public JobDefinition getJobDefinitionOrThrowException(JobInstance theJobInstance) { return getJobDefinitionOrThrowException(theJobInstance.getJobDefinitionId(), theJobInstance.getJobDefinitionVersion()); } + + public Collection getJobDefinitionVersions(String theDefinitionId) { + return myJobs.getOrDefault(theDefinitionId, ImmutableSortedMap.of()).keySet(); + } } diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDefinitionRegistryTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDefinitionRegistryTest.java index 3ade1d6ad93..dfbdf3e9c8f 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDefinitionRegistryTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDefinitionRegistryTest.java @@ -11,6 +11,9 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @@ -122,5 +125,16 @@ class JobDefinitionRegistryTest { } } + @Test + public void testRemoveJobDefinition() { + mySvc.removeJobDefinition("A", 1); + + assertThat(mySvc.getJobDefinitionIds(), containsInAnyOrder("A")); + assertThat(mySvc.getJobDefinitionVersions("A"), containsInAnyOrder(2)); + + mySvc.removeJobDefinition("A", 2); + assertThat(mySvc.getJobDefinitionIds(), empty()); + } + }