This commit is contained in:
James Agnew 2023-04-19 13:31:18 -04:00
parent 8b751aa30f
commit fd11cb0777
11 changed files with 92 additions and 35 deletions

View File

@ -23,6 +23,7 @@ package ca.uhn.fhir.jpa.config.r4;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.context.ParserOptions;
import ca.uhn.fhir.rest.client.api.IRestfulClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
@ -53,6 +54,14 @@ public class FhirContextR4Config {
parserOptions.setDontStripVersionsFromReferencesAtPaths(DEFAULT_PRESERVE_VERSION_REFS_R4_AND_LATER);
}
// We use this context to create subscription deliveries and that kind of thing. It doesn't
// make much sense to let the HTTP client pool be a blocker since we have delivery queue
// sizing as the lever to affect that. So make the pool big enough that it shouldn't get
// in the way.
IRestfulClientFactory clientFactory = theFhirContext.getRestfulClientFactory();
clientFactory.setPoolMaxPerRoute(1000);
clientFactory.setPoolMaxTotal(1000);
return theFhirContext;
}
}

View File

@ -175,7 +175,7 @@ public class TermLoaderSvcImpl implements ITermLoaderSvc {
private static final int LOG_INCREMENT = 1000;
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(TermLoaderSvcImpl.class);
// FYI: Hardcoded to R4 because that's what the term svc uses internally
private final FhirContext myCtx = FhirContext.forR4();
private final FhirContext myCtx = FhirContext.forR4Cached();
private final ITermDeferredStorageSvc myDeferredStorageSvc;
private final ITermCodeSystemStorageSvc myCodeSystemStorageSvc;
@ -500,7 +500,7 @@ public class TermLoaderSvcImpl implements ITermLoaderSvc {
CodeSystem imgthlaCs;
try {
String imgthlaCsString = IOUtils.toString(TermReadSvcImpl.class.getResourceAsStream("/ca/uhn/fhir/jpa/term/imgthla/imgthla.xml"), Charsets.UTF_8);
imgthlaCs = FhirContext.forR4().newXmlParser().parseResource(CodeSystem.class, imgthlaCsString);
imgthlaCs = FhirContext.forR4Cached().newXmlParser().parseResource(CodeSystem.class, imgthlaCsString);
} catch (IOException e) {
throw new InternalErrorException(Msg.code(869) + "Failed to load imgthla.xml", e);
}
@ -603,7 +603,7 @@ public class TermLoaderSvcImpl implements ITermLoaderSvc {
throw new InvalidRequestException(Msg.code(875) + "Did not find loinc.xml in the ZIP distribution.");
}
CodeSystem loincCs = FhirContext.forR4().newXmlParser().parseResource(CodeSystem.class, loincCsString);
CodeSystem loincCs = FhirContext.forR4Cached().newXmlParser().parseResource(CodeSystem.class, loincCsString);
if (isNotBlank(loincCs.getVersion())) {
throw new InvalidRequestException(Msg.code(876) + "'loinc.xml' file must not have a version defined. To define a version use '" +
LOINC_CODESYSTEM_VERSION.getCode() + "' property of 'loincupload.properties' file");

View File

@ -65,7 +65,7 @@ public class MemberMatcherR4HelperTest {
@RegisterExtension
LogbackCaptureTestExtension myLogCapture = new LogbackCaptureTestExtension((Logger) MemberMatcherR4Helper.ourLog, Level.TRACE);
@Spy
private final FhirContext myFhirContext = FhirContext.forR4();
private final FhirContext myFhirContext = FhirContext.forR4Cached();
@Mock
private IFhirResourceDao<Coverage> myCoverageDao;
@Mock

View File

@ -34,6 +34,7 @@ import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.api.RequestTypeEnum;
import ca.uhn.fhir.rest.client.apache.ApacheRestfulClientFactory;
import ca.uhn.fhir.rest.client.api.Header;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.api.IHttpClient;
@ -56,6 +57,7 @@ import org.springframework.context.annotation.Scope;
import org.springframework.messaging.MessagingException;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
@ -57,7 +58,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
// Execute
myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id");
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters());
// Verify
assertEquals(2, outcome.getRecordsProcessed());
@ -87,7 +88,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
// Execute
myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id");
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters());
// Verify
assertEquals(2, outcome.getRecordsProcessed());
@ -120,7 +121,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
// Execute
myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id");
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters());
// Verify
assertEquals(2, outcome.getRecordsProcessed());
@ -188,7 +189,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
// Execute
myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id");
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters());
// Verify
assertEquals(2, outcome.getRecordsProcessed());
@ -233,7 +234,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
// Execute
myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id");
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters());
// Verify
assertEquals(4, outcome.getRecordsProcessed());

View File

@ -67,9 +67,11 @@ public class ReindexAppCtx {
.build();
}
// TODO: We don't use a parameterized version of the GenerateRangeChunksStep which means
// we're not getting type checking. This should be cleaned up.
@Bean
public GenerateRangeChunksStep reindexGenerateRangeChunksStep() {
return new GenerateRangeChunksStep();
return new ReindexGenerateRangeChunksStep();
}
@Bean

View File

@ -1,19 +0,0 @@
/*-
* #%L
* hapi-fhir-storage-batch2-jobs
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/

View File

@ -0,0 +1,27 @@
package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
public class ReindexGenerateRangeChunksStep extends GenerateRangeChunksStep<ReindexJobParameters> {
private static final Logger ourLog = LoggerFactory.getLogger(ReindexGenerateRangeChunksStep.class);
@Nonnull
@Override
public RunOutcome run(@Nonnull StepExecutionDetails<ReindexJobParameters, VoidModel> theStepExecutionDetails, @Nonnull IJobDataSink<PartitionedUrlChunkRangeJson> theDataSink) throws JobExecutionFailedException {
ReindexJobParameters parameters = theStepExecutionDetails.getParameters();
ourLog.info("Beginning reindex job - OptimizeStorage[{}] - ReindexSearchParameters[{}]", parameters.isOptimizeStorage(), parameters.isReindexSearchParameters());
return super.run(theStepExecutionDetails, theDataSink);
}
}

View File

@ -21,7 +21,36 @@ package ca.uhn.fhir.batch2.jobs.reindex;
*/
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
public class ReindexJobParameters extends PartitionedUrlListJobParameters {
@JsonProperty(value = "optimizeStorage", defaultValue = "false", required = false)
@Nullable
private Boolean myOptimizeStorage;
@JsonProperty(value = "reindexSearchParameters", defaultValue = "true", required = false)
@Nullable
private Boolean myReindexSearchParameters;
@Nullable
public boolean isOptimizeStorage() {
return defaultIfNull(myOptimizeStorage, Boolean.FALSE);
}
public void setOptimizeStorage(boolean myOptimizeStorage) {
this.myOptimizeStorage = myOptimizeStorage;
}
public boolean isReindexSearchParameters() {
return defaultIfNull(myReindexSearchParameters, Boolean.TRUE);
}
public void setReindexSearchParameters(boolean myReindexSearchParameters) {
this.myReindexSearchParameters = myReindexSearchParameters;
}
}

View File

@ -74,17 +74,18 @@ public class ReindexStep implements IJobStepWorker<ReindexJobParameters, Resourc
public RunOutcome run(@Nonnull StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails, @Nonnull IJobDataSink<VoidModel> theDataSink) throws JobExecutionFailedException {
ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData();
ReindexJobParameters jobParameters = theStepExecutionDetails.getParameters();
return doReindex(data, theDataSink, theStepExecutionDetails.getInstance().getInstanceId(), theStepExecutionDetails.getChunkId());
return doReindex(data, theDataSink, theStepExecutionDetails.getInstance().getInstanceId(), theStepExecutionDetails.getChunkId(), jobParameters);
}
@Nonnull
public RunOutcome doReindex(ResourceIdListWorkChunkJson data, IJobDataSink<VoidModel> theDataSink, String theInstanceId, String theChunkId) {
public RunOutcome doReindex(ResourceIdListWorkChunkJson data, IJobDataSink<VoidModel> theDataSink, String theInstanceId, String theChunkId, ReindexJobParameters theJobParameters) {
RequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setRetry(true);
requestDetails.setMaxRetries(REINDEX_MAX_RETRIES);
TransactionDetails transactionDetails = new TransactionDetails();
myHapiTransactionService.execute(requestDetails, transactionDetails, new ReindexJob(data, requestDetails, transactionDetails, theDataSink, theInstanceId, theChunkId));
myHapiTransactionService.execute(requestDetails, transactionDetails, new ReindexJob(data, requestDetails, transactionDetails, theDataSink, theInstanceId, theChunkId, theJobParameters));
return new RunOutcome(data.size());
}
@ -96,14 +97,16 @@ public class ReindexStep implements IJobStepWorker<ReindexJobParameters, Resourc
private final IJobDataSink<VoidModel> myDataSink;
private final String myChunkId;
private final String myInstanceId;
private final ReindexJobParameters myJobParameters;
public ReindexJob(ResourceIdListWorkChunkJson theData, RequestDetails theRequestDetails, TransactionDetails theTransactionDetails, IJobDataSink<VoidModel> theDataSink, String theInstanceId, String theChunkId) {
public ReindexJob(ResourceIdListWorkChunkJson theData, RequestDetails theRequestDetails, TransactionDetails theTransactionDetails, IJobDataSink<VoidModel> theDataSink, String theInstanceId, String theChunkId, ReindexJobParameters theJobParameters) {
myData = theData;
myRequestDetails = theRequestDetails;
myTransactionDetails = theTransactionDetails;
myDataSink = theDataSink;
myInstanceId = theInstanceId;
myChunkId = theChunkId;
myJobParameters = theJobParameters;
}
@Override
@ -128,9 +131,10 @@ public class ReindexStep implements IJobStepWorker<ReindexJobParameters, Resourc
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(nextResourceType);
IResourcePersistentId<?> resourcePersistentId = persistentIds.get(i);
try {
if (false) {
if (myJobParameters.isReindexSearchParameters()) {
dao.reindex(resourcePersistentId, myRequestDetails, myTransactionDetails);
} else {
}
if (myJobParameters.isOptimizeStorage()) {
dao.migrateLogToVarChar(resourcePersistentId);
}
} catch (BaseServerResponseException | DataFormatException e) {

View File

@ -48,6 +48,8 @@ public class GenerateRangeChunksStep<PT extends PartitionedUrlListJobParameters>
Date start = BATCH_START_DATE;
Date end = new Date();
PT parameters = theStepExecutionDetails.getParameters();
if (params.getPartitionedUrls().isEmpty()) {
ourLog.info("Searching for All Resources from {} to {}", start, end);
PartitionedUrlChunkRangeJson nextRange = new PartitionedUrlChunkRangeJson();