Merge pull request #3028 from hapifhir/ks-20210923-bulk-export-batch

support non-jpa batch
This commit is contained in:
Tadgh 2021-09-24 08:56:08 -04:00 committed by GitHub
commit 79a3fb19b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 47 additions and 31 deletions

View File

@ -54,6 +54,8 @@ public final class BatchConstants {
* MDM Clear
*/
public static final String MDM_CLEAR_JOB_NAME = "mdmClearJob";
public static final String BULK_EXPORT_READ_CHUNK_PARAMETER = "readChunkSize";
public static final String BULK_EXPORT_GROUP_ID_PARAMETER = "groupId";
/**
* This Set contains the step names across all job types that are appropriate for
* someone to look at the write count for that given step in order to determine the

View File

@ -123,6 +123,7 @@ import ca.uhn.fhir.jpa.search.cache.DatabaseSearchResultCacheSvcImpl;
import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
import ca.uhn.fhir.jpa.search.elastic.IndexNamePrefixLayoutStrategy;
import ca.uhn.fhir.jpa.search.reindex.BlockPolicy;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.search.reindex.ResourceReindexer;
import ca.uhn.fhir.jpa.search.reindex.ResourceReindexingSvcImpl;
@ -403,7 +404,7 @@ public abstract class BaseConfig {
asyncTaskExecutor.setQueueCapacity(0);
asyncTaskExecutor.setAllowCoreThreadTimeOut(true);
asyncTaskExecutor.setThreadNamePrefix("JobLauncher-");
asyncTaskExecutor.setRejectedExecutionHandler(new ResourceReindexingSvcImpl.BlockPolicy());
asyncTaskExecutor.setRejectedExecutionHandler(new BlockPolicy());
asyncTaskExecutor.initialize();
return asyncTaskExecutor;
}

View File

@ -42,7 +42,6 @@ import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.r4.model.InstantType;
import javax.annotation.Nullable;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -54,6 +53,7 @@ import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
@ -142,29 +142,6 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
);
}
/**
* A handler for rejected tasks that will have the caller block until space is available.
* This was stolen from old hibernate search(5.X.X), as it has been removed in HS6. We can probably come up with a better solution though.
*/
public static class BlockPolicy implements RejectedExecutionHandler {
/**
* Puts the Runnable to the blocking queue, effectively blocking the delegating thread until space is available.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
try {
e.getQueue().put( r );
} catch (InterruptedException e1) {
ourLog.error("Interrupted Execption for task: {}",r, e1 );
Thread.currentThread().interrupt();
}
}
}
public void scheduleJob() {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();

View File

@ -34,6 +34,11 @@
<artifactId>hapi-fhir-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-batch</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-structures-dstu2</artifactId>

View File

@ -20,13 +20,12 @@ package ca.uhn.fhir.jpa.bulk.export.job;
* #L%
*/
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import org.slf4j.Logger;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.JobParametersValidator;
import static ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobConfig.*;
import static org.slf4j.LoggerFactory.getLogger;
public class GroupIdPresentValidator implements JobParametersValidator {
@ -35,10 +34,10 @@ public class GroupIdPresentValidator implements JobParametersValidator {
@Override
public void validate(JobParameters theJobParameters) throws JobParametersInvalidException {
if (theJobParameters == null || theJobParameters.getString(GROUP_ID_PARAMETER) == null) {
throw new JobParametersInvalidException("Group Bulk Export jobs must have a " + GROUP_ID_PARAMETER + " attribute");
if (theJobParameters == null || theJobParameters.getString(BatchConstants.BULK_EXPORT_GROUP_ID_PARAMETER) == null) {
throw new JobParametersInvalidException("Group Bulk Export jobs must have a " + BatchConstants.BULK_EXPORT_GROUP_ID_PARAMETER + " attribute");
} else {
ourLog.debug("detected we are running in group mode with group id [{}]", theJobParameters.getString(GROUP_ID_PARAMETER));
ourLog.debug("detected we are running in group mode with group id [{}]", theJobParameters.getString(BatchConstants.BULK_EXPORT_GROUP_ID_PARAMETER));
}
}
}

View File

@ -21,7 +21,6 @@ package ca.uhn.fhir.jpa.bulk.export.provider;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportResponseJson;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
@ -31,6 +30,7 @@ import ca.uhn.fhir.rest.annotation.OperationParam;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.PreferHeader;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import ca.uhn.fhir.rest.server.RestfulServerUtils;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;

View File

@ -0,0 +1,32 @@
package ca.uhn.fhir.jpa.search.reindex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* A handler for rejected tasks that will have the caller block until space is available.
* This was stolen from old hibernate search(5.X.X), as it has been removed in HS6. We can probably come up with a better solution though.
*/
// TODO KHS consolidate with the other BlockPolicy class this looks like it is a duplicate of
public class BlockPolicy implements RejectedExecutionHandler {
private static final Logger ourLog = LoggerFactory.getLogger(BlockPolicy.class);
/**
* Puts the Runnable to the blocking queue, effectively blocking the delegating thread until space is available.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
try {
e.getQueue().put(r);
} catch (InterruptedException e1) {
ourLog.error("Interrupted Execption for task: {}", r, e1);
Thread.currentThread().interrupt();
}
}
}