Add some testing to the bulk export service
This commit is contained in:
parent
5e74a6f3e8
commit
6b21e8cd90
|
@ -60,6 +60,8 @@ ca.uhn.fhir.validation.ValidationResult.noIssuesDetected=No issues detected duri
|
|||
|
||||
# JPA Messages
|
||||
|
||||
ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl.onlyBinarySelected=Binary resources may not be exported with bulk export
|
||||
ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl.unknownResourceType=Unknown or unsupported resource type: {0}
|
||||
|
||||
ca.uhn.fhir.jpa.config.HapiFhirHibernateJpaDialect.resourceVersionConstraintFailure=The operation has failed with a version constraint failure. This generally means that two clients/threads were trying to update the same resource at the same time, and this request was chosen as the failing request.
|
||||
ca.uhn.fhir.jpa.config.HapiFhirHibernateJpaDialect.resourceIndexedCompositeStringUniqueConstraintFailure=The operation has failed with a unique index constraint failure. This probably means that the operation was trying to create/update a resource that would have resulted in a duplicate value for a unique index.
|
||||
|
|
|
@ -26,7 +26,6 @@ import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
|||
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
|
||||
import ca.uhn.fhir.util.BinaryUtil;
|
||||
import ca.uhn.fhir.util.StopWatch;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.time.DateUtils;
|
||||
import org.hl7.fhir.instance.model.api.IBaseBinary;
|
||||
|
@ -53,6 +52,7 @@ import java.io.OutputStreamWriter;
|
|||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static ca.uhn.fhir.util.UrlUtil.escapeUrlParam;
|
||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||
|
@ -80,7 +80,7 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
|
|||
private TransactionTemplate myTxTemplate;
|
||||
|
||||
private long myFileMaxChars = 500 * FileUtils.ONE_KB;
|
||||
private int myRetentionPeriod = (int) DateUtils.MILLIS_PER_DAY;
|
||||
private int myRetentionPeriod = (int) (2 * DateUtils.MILLIS_PER_HOUR);
|
||||
|
||||
/**
|
||||
* This method is called by the scheduler to run a pass of the
|
||||
|
@ -332,7 +332,12 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
|
|||
return toSubmittedJobInfo(existing.iterator().next());
|
||||
}
|
||||
|
||||
if (theResourceTypes == null || resourceTypes.isEmpty()) {
|
||||
if (resourceTypes != null && resourceTypes.contains("Binary")) {
|
||||
String msg = myContext.getLocalizer().getMessage(BulkDataExportSvcImpl.class, "onlyBinarySelected");
|
||||
throw new InvalidRequestException(msg);
|
||||
}
|
||||
|
||||
if (resourceTypes == null || resourceTypes.isEmpty()) {
|
||||
// This is probably not a useful default, but having the default be "download the whole
|
||||
// server" seems like a risky default too. We'll deal with that by having the default involve
|
||||
// only returning a small time span
|
||||
|
@ -342,6 +347,12 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
|
|||
}
|
||||
}
|
||||
|
||||
resourceTypes =
|
||||
resourceTypes
|
||||
.stream()
|
||||
.filter(t -> !"Binary".equals(t))
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
BulkExportJobEntity job = new BulkExportJobEntity();
|
||||
job.setJobId(UUID.randomUUID().toString());
|
||||
job.setStatus(BulkJobStatusEnum.SUBMITTED);
|
||||
|
@ -354,7 +365,8 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
|
|||
|
||||
for (String nextType : resourceTypes) {
|
||||
if (!myDaoRegistry.isResourceTypeSupported(nextType)) {
|
||||
throw new InvalidRequestException("Unknown or unsupported resource type: " + nextType);
|
||||
String msg = myContext.getLocalizer().getMessage(BulkDataExportSvcImpl.class, "unknownResourceType", nextType);
|
||||
throw new InvalidRequestException(msg);
|
||||
}
|
||||
|
||||
BulkExportCollectionEntity collection = new BulkExportCollectionEntity();
|
||||
|
|
|
@ -10,6 +10,7 @@ import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
|
|||
import ca.uhn.fhir.rest.api.Constants;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||
import ca.uhn.fhir.test.utilities.UnregisterScheduledProcessor;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.lang3.time.DateUtils;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
|
@ -59,6 +60,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
|
|||
job.setStatus(BulkJobStatusEnum.COMPLETE);
|
||||
job.setExpiry(DateUtils.addHours(new Date(), -1));
|
||||
job.setJobId(UUID.randomUUID().toString());
|
||||
job.setCreated(new Date());
|
||||
job.setRequest("$export");
|
||||
myBulkExportJobDao.save(job);
|
||||
|
||||
|
@ -108,17 +110,17 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testCreateBulkLoad_NoResourceTypes() {
|
||||
public void testCreateBulkLoad_OnlyBinarySelected() {
|
||||
try {
|
||||
myBulkDataExportSvc.submitJob(Constants.CT_FHIR_NDJSON, Sets.newHashSet(), null, null);
|
||||
myBulkDataExportSvc.submitJob(Constants.CT_FHIR_JSON_NEW, Sets.newHashSet("Binary"), null, null);
|
||||
fail();
|
||||
} catch (InvalidRequestException e) {
|
||||
assertEquals("No resource types specified", e.getMessage());
|
||||
assertEquals("Invalid output format: application/fhir+json", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateBulkLoad_InvalidResourceTypes() {
|
||||
public void testSubmit_InvalidResourceTypes() {
|
||||
try {
|
||||
myBulkDataExportSvc.submitJob(Constants.CT_FHIR_NDJSON, Sets.newHashSet("Patient", "FOO"), null, null);
|
||||
fail();
|
||||
|
@ -128,7 +130,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testCreateBulkLoad() {
|
||||
public void testSubmitForSpecificResources() {
|
||||
|
||||
// Create some resources to load
|
||||
createResources();
|
||||
|
@ -170,6 +172,56 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubmitWithoutSpecificResources() {
|
||||
|
||||
// Create some resources to load
|
||||
createResources();
|
||||
|
||||
// Binary shouldn't be included in the results so we'll add one here
|
||||
// and make sure it isn't included in the results
|
||||
Binary b = new Binary();
|
||||
b.setContentType("text/plain");
|
||||
b.setContent("text".getBytes(Charsets.UTF_8));
|
||||
myBinaryDao.create(b);
|
||||
|
||||
// Create a bulk job
|
||||
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(null, null, null, null);
|
||||
assertNotNull(jobDetails.getJobId());
|
||||
|
||||
// Check the status
|
||||
IBulkDataExportSvc.JobInfo status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId());
|
||||
assertEquals(BulkJobStatusEnum.SUBMITTED, status.getStatus());
|
||||
assertEquals("/$export?_outputFormat=application%2Ffhir%2Bndjson", status.getRequest());
|
||||
|
||||
// Run a scheduled pass to build the export
|
||||
myBulkDataExportSvc.buildExportFiles();
|
||||
|
||||
// Fetch the job again
|
||||
status = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId());
|
||||
assertEquals(BulkJobStatusEnum.COMPLETE, status.getStatus());
|
||||
assertEquals(2, status.getFiles().size());
|
||||
|
||||
// Iterate over the files
|
||||
for (IBulkDataExportSvc.FileEntry next : status.getFiles()) {
|
||||
Binary nextBinary = myBinaryDao.read(next.getResourceId());
|
||||
assertEquals(Constants.CT_FHIR_NDJSON, nextBinary.getContentType());
|
||||
String nextContents = new String(nextBinary.getContent(), Constants.CHARSET_UTF8);
|
||||
ourLog.info("Next contents for type {}:\n{}", next.getResourceType(), nextContents);
|
||||
|
||||
if ("Patient".equals(next.getResourceType())) {
|
||||
assertThat(nextContents, containsString("\"value\":\"PAT0\"}]}\n"));
|
||||
assertEquals(10, nextContents.split("\n").length);
|
||||
} else if ("Observation".equals(next.getResourceType())) {
|
||||
assertThat(nextContents, containsString("\"subject\":{\"reference\":\"Patient/PAT0\"}}\n"));
|
||||
assertEquals(10, nextContents.split("\n").length);
|
||||
} else {
|
||||
fail(next.getResourceType());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubmitReusesExisting() {
|
||||
|
||||
|
@ -186,7 +238,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
|
|||
|
||||
|
||||
@Test
|
||||
public void testCreateBulkLoad_WithSince() throws InterruptedException {
|
||||
public void testSubmit_WithSince() throws InterruptedException {
|
||||
|
||||
// Create some resources to load
|
||||
createResources();
|
||||
|
|
|
@ -64,7 +64,7 @@ public class JdbcUtils {
|
|||
try {
|
||||
metadata = connection.getMetaData();
|
||||
|
||||
ResultSet indexes = metadata.getIndexInfo(connection.getCatalog(), connection.getSchema(), massageIdentifier(metadata, theTableName), false, false);
|
||||
ResultSet indexes = getIndexInfo(theTableName, connection, metadata, false);
|
||||
Set<String> indexNames = new HashSet<>();
|
||||
while (indexes.next()) {
|
||||
ourLog.debug("*** Next index: {}", new ColumnMapRowMapper().mapRow(indexes, 0));
|
||||
|
@ -73,7 +73,7 @@ public class JdbcUtils {
|
|||
indexNames.add(indexName);
|
||||
}
|
||||
|
||||
indexes = metadata.getIndexInfo(connection.getCatalog(), connection.getSchema(), massageIdentifier(metadata, theTableName), true, false);
|
||||
indexes = getIndexInfo(theTableName, connection, metadata, true);
|
||||
while (indexes.next()) {
|
||||
ourLog.debug("*** Next index: {}", new ColumnMapRowMapper().mapRow(indexes, 0));
|
||||
String indexName = indexes.getString("INDEX_NAME");
|
||||
|
@ -99,7 +99,7 @@ public class JdbcUtils {
|
|||
DatabaseMetaData metadata;
|
||||
try {
|
||||
metadata = connection.getMetaData();
|
||||
ResultSet indexes = metadata.getIndexInfo(connection.getCatalog(), connection.getSchema(), massageIdentifier(metadata, theTableName), false, false);
|
||||
ResultSet indexes = getIndexInfo(theTableName, connection, metadata, false);
|
||||
|
||||
while (indexes.next()) {
|
||||
String indexName = indexes.getString("INDEX_NAME");
|
||||
|
@ -118,6 +118,12 @@ public class JdbcUtils {
|
|||
}
|
||||
}
|
||||
|
||||
private static ResultSet getIndexInfo(String theTableName, Connection theConnection, DatabaseMetaData theMetadata, boolean theUnique) throws SQLException {
|
||||
// FYI Using approximate=false causes a very slow table scan on Oracle
|
||||
boolean approximate = true;
|
||||
return theMetadata.getIndexInfo(theConnection.getCatalog(), theConnection.getSchema(), massageIdentifier(theMetadata, theTableName), theUnique, approximate);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve all index names
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue