Make sure that reindexing happens correctly
This commit is contained in:
parent
e8482f5c87
commit
2e030eebaa
|
@ -214,7 +214,8 @@ public class SearchParamExtractorService {
|
|||
|
||||
private void extractCompositeStringUniques(ResourceTable theEntity, ResourceIndexedSearchParams theParams) {
|
||||
|
||||
List<JpaRuntimeSearchParam> uniqueSearchParams = mySearchParamRegistry.getActiveUniqueSearchParams(theEntity.getResourceType());
|
||||
String resourceType = theEntity.getResourceType();
|
||||
List<JpaRuntimeSearchParam> uniqueSearchParams = mySearchParamRegistry.getActiveUniqueSearchParams(resourceType);
|
||||
|
||||
for (JpaRuntimeSearchParam next : uniqueSearchParams) {
|
||||
|
||||
|
@ -282,7 +283,7 @@ public class SearchParamExtractorService {
|
|||
}
|
||||
}
|
||||
|
||||
Set<String> queryStringsToPopulate = theParams.extractCompositeStringUniquesValueChains(theEntity.getResourceType(), partsChoices);
|
||||
Set<String> queryStringsToPopulate = theParams.extractCompositeStringUniquesValueChains(resourceType, partsChoices);
|
||||
|
||||
for (String nextQueryString : queryStringsToPopulate) {
|
||||
if (isNotBlank(nextQueryString)) {
|
||||
|
|
|
@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.entity;
|
|||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -21,6 +21,8 @@ package ca.uhn.fhir.jpa.entity;
|
|||
*/
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringStyle;
|
||||
|
||||
import javax.persistence.*;
|
||||
import java.io.Serializable;
|
||||
|
@ -110,4 +112,20 @@ public class ResourceReindexJobEntity implements Serializable {
|
|||
public void setDeleted(boolean theDeleted) {
|
||||
myDeleted = theDeleted;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
ToStringBuilder b = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
|
||||
.append("id", myId)
|
||||
.append("resourceType", myResourceType)
|
||||
.append("thresholdLow", myThresholdLow)
|
||||
.append("thresholdHigh", myThresholdHigh);
|
||||
if (myDeleted) {
|
||||
b.append("deleted", myDeleted);
|
||||
}
|
||||
if (mySuspendedUntil != null) {
|
||||
b.append("suspendedUntil", mySuspendedUntil);
|
||||
}
|
||||
return b.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -245,8 +245,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
|||
});
|
||||
} catch (Exception e) {
|
||||
ourLog.warn("Failed to activate search: {}", e.toString());
|
||||
// FIXME: aaaaa
|
||||
ourLog.info("Failed to activate search", e);
|
||||
ourLog.trace("Failed to activate search", e);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
@ -518,10 +517,6 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
|
|||
* user has requested resources 0-60, then they would get 0-50 back but the search
|
||||
* coordinator would then stop searching.SearchCoordinatorSvcImplTest
|
||||
*/
|
||||
// FIXME: aaaaaaaa
|
||||
// List<Long> remainingResources = SearchCoordinatorSvcImpl.this.getResources(mySearch.getUuid(), mySyncedPids.size(), theToIndex);
|
||||
// ourLog.debug("Adding {} resources to the existing {} synced resource IDs", remainingResources.size(), mySyncedPids.size());
|
||||
// mySyncedPids.addAll(remainingResources);
|
||||
keepWaiting = false;
|
||||
break;
|
||||
case FAILED:
|
||||
|
|
|
@ -20,6 +20,11 @@ package ca.uhn.fhir.jpa.search.reindex;
|
|||
* #L%
|
||||
*/
|
||||
|
||||
import org.apache.commons.lang3.time.DateUtils;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
|
||||
import javax.transaction.Transactional;
|
||||
|
||||
public interface IResourceReindexingSvc {
|
||||
|
||||
/**
|
||||
|
@ -34,7 +39,10 @@ public interface IResourceReindexingSvc {
|
|||
|
||||
/**
|
||||
* Called automatically by the job scheduler
|
||||
*
|
||||
*/
|
||||
void scheduleReindexingPass();
|
||||
|
||||
/**
|
||||
* @return Returns null if the system did not attempt to perform a pass because one was
|
||||
* already proceeding. Otherwise, returns the number of resources affected.
|
||||
*/
|
||||
|
|
|
@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.search.reindex;
|
|||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -36,6 +36,7 @@ import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
|||
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
|
||||
import ca.uhn.fhir.util.StopWatch;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.apache.commons.lang3.time.DateUtils;
|
||||
|
@ -71,8 +72,9 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
|||
|
||||
public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
|
||||
|
||||
private static final Date BEGINNING_OF_TIME = new Date(0);
|
||||
static final Date BEGINNING_OF_TIME = new Date(0);
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(ResourceReindexingSvcImpl.class);
|
||||
public static final int PASS_SIZE = 25000;
|
||||
private final ReentrantLock myIndexingLock = new ReentrantLock();
|
||||
@Autowired
|
||||
private IResourceReindexJobDao myReindexJobDao;
|
||||
|
@ -176,6 +178,13 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
|
|||
@Override
|
||||
@Transactional(Transactional.TxType.NEVER)
|
||||
@Scheduled(fixedDelay = 10 * DateUtils.MILLIS_PER_SECOND)
|
||||
public void scheduleReindexingPass() {
|
||||
runReindexingPass();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
@Transactional(Transactional.TxType.NEVER)
|
||||
public Integer runReindexingPass() {
|
||||
if (myDaoConfig.isSchedulingDisabled()) {
|
||||
return null;
|
||||
|
@ -223,10 +232,16 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
|
|||
Collection<ResourceReindexJobEntity> jobs = myTxTemplate.execute(t -> myReindexJobDao.findAll(PageRequest.of(0, 10), false));
|
||||
assert jobs != null;
|
||||
|
||||
if (jobs.size() > 0) {
|
||||
ourLog.info("Running {} reindex jobs: {}", jobs.size(), jobs);
|
||||
} else {
|
||||
ourLog.debug("Running {} reindex jobs: {}", jobs.size(), jobs);
|
||||
}
|
||||
|
||||
int count = 0;
|
||||
for (ResourceReindexJobEntity next : jobs) {
|
||||
|
||||
if (next.getThresholdHigh().getTime() < System.currentTimeMillis()) {
|
||||
if (next.getThresholdLow() != null && next.getThresholdLow().getTime() >= next.getThresholdHigh().getTime()) {
|
||||
markJobAsDeleted(next);
|
||||
continue;
|
||||
}
|
||||
|
@ -236,9 +251,10 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
|
|||
return count;
|
||||
}
|
||||
|
||||
private void markJobAsDeleted(ResourceReindexJobEntity next) {
|
||||
private void markJobAsDeleted(ResourceReindexJobEntity theJob) {
|
||||
ourLog.info("Marking reindexing job ID[{}] as deleted", theJob.getId());
|
||||
myTxTemplate.execute(t -> {
|
||||
myReindexJobDao.markAsDeletedById(next.getId());
|
||||
myReindexJobDao.markAsDeletedById(theJob.getId());
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
@ -259,8 +275,9 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
|
|||
Date high = theJob.getThresholdHigh();
|
||||
|
||||
// Query for resources within threshold
|
||||
StopWatch pageSw = new StopWatch();
|
||||
Slice<Long> range = myTxTemplate.execute(t -> {
|
||||
PageRequest page = PageRequest.of(0, 10000);
|
||||
PageRequest page = PageRequest.of(0, PASS_SIZE);
|
||||
if (isNotBlank(theJob.getResourceType())) {
|
||||
return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(page, theJob.getResourceType(), low, high);
|
||||
} else {
|
||||
|
@ -269,6 +286,13 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
|
|||
});
|
||||
Validate.notNull(range);
|
||||
int count = range.getNumberOfElements();
|
||||
ourLog.info("Loaded {} resources for reindexing in {}", count, pageSw.toString());
|
||||
|
||||
// If we didn't find any results at all, mark as deleted
|
||||
if (count == 0) {
|
||||
markJobAsDeleted(theJob);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Submit each resource requiring reindexing
|
||||
List<Future<Date>> futures = range
|
||||
|
@ -304,18 +328,15 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
|
|||
}
|
||||
}
|
||||
|
||||
// Just in case we end up in some sort of infinite loop. This shouldn't happen, and couldn't really
|
||||
// happen unless there were 10000 resources with the exact same update time down to the
|
||||
// millisecond.
|
||||
Date newLow;
|
||||
if (latestDate == null) {
|
||||
markJobAsDeleted(theJob);
|
||||
return 0;
|
||||
}
|
||||
if (latestDate.getTime() == low.getTime()) {
|
||||
ourLog.error("Final pass time for reindex JOB[{}] has same ending low value: {}", theJob.getId(), latestDate);
|
||||
newLow = new Date(latestDate.getTime() + 1);
|
||||
} else if (!haveMultipleDates) {
|
||||
if (count == PASS_SIZE) {
|
||||
// Just in case we end up in some sort of infinite loop. This shouldn't happen, and couldn't really
|
||||
// happen unless there were 10000 resources with the exact same update time down to the
|
||||
// millisecond.
|
||||
ourLog.error("Final pass time for reindex JOB[{}] has same ending low value: {}", theJob.getId(), latestDate);
|
||||
}
|
||||
|
||||
newLow = new Date(latestDate.getTime() + 1);
|
||||
} else {
|
||||
newLow = latestDate;
|
||||
|
@ -326,7 +347,7 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
|
|||
return null;
|
||||
});
|
||||
|
||||
ourLog.info("Completed pass of reindex JOB[{}] - Indexed {} resources in {} ({} / sec) - Have indexed until: {}", theJob.getId(), count, sw.toString(), sw.formatThroughput(count, TimeUnit.SECONDS), theJob.getThresholdLow());
|
||||
ourLog.info("Completed pass of reindex JOB[{}] - Indexed {} resources in {} ({} / sec) - Have indexed until: {}", theJob.getId(), count, sw.toString(), sw.formatThroughput(count, TimeUnit.SECONDS), newLow);
|
||||
return counter.get();
|
||||
}
|
||||
|
||||
|
@ -450,6 +471,7 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
|
|||
return e;
|
||||
}
|
||||
});
|
||||
|
||||
} catch (ResourceVersionConflictException e) {
|
||||
/*
|
||||
* We reindex in multiple threads, so it's technically possible that two threads try
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.commons.lang3.Validate;
|
|||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
@JsonAutoDetect(creatorVisibility = JsonAutoDetect.Visibility.NONE, fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
|
||||
public class ResourceDeliveryMessage {
|
||||
|
@ -62,9 +64,8 @@ public class ResourceDeliveryMessage {
|
|||
}
|
||||
|
||||
public IBaseResource getPayload(FhirContext theCtx) {
|
||||
Validate.notNull(myPayloadString);
|
||||
IBaseResource retVal = myPayload;
|
||||
if (retVal == null) {
|
||||
if (retVal == null && isNotBlank(myPayloadString)) {
|
||||
retVal = theCtx.newJsonParser().parseResource(myPayloadString);
|
||||
myPayload = retVal;
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.subscription.resthook;
|
|||
*/
|
||||
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.context.RuntimeResourceDefinition;
|
||||
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
|
||||
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionDeliverySubscriber;
|
||||
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor;
|
||||
|
@ -129,12 +130,14 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
|
|||
protected IBaseResource getAndMassagePayload(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription) {
|
||||
IBaseResource payloadResource = theMsg.getPayload(getContext());
|
||||
|
||||
if (theSubscription.getRestHookDetails().isDeliverLatestVersion()) {
|
||||
IFhirResourceDao dao = getSubscriptionInterceptor().getDao(payloadResource.getClass());
|
||||
if (payloadResource == null || theSubscription.getRestHookDetails().isDeliverLatestVersion()) {
|
||||
IIdType payloadId = theMsg.getPayloadId(getContext());
|
||||
RuntimeResourceDefinition resourceDef = getContext().getResourceDefinition(payloadId.getResourceType());
|
||||
IFhirResourceDao dao = getSubscriptionInterceptor().getDao(resourceDef.getImplementingClass());
|
||||
try {
|
||||
payloadResource = dao.read(payloadResource.getIdElement().toVersionless());
|
||||
payloadResource = dao.read(payloadId.toVersionless());
|
||||
} catch (ResourceGoneException e) {
|
||||
ourLog.warn("Resource {} is deleted, not going to deliver for subscription {}", payloadResource.getIdElement(), theSubscription.getIdElement(getContext()));
|
||||
ourLog.warn("Resource {} is deleted, not going to deliver for subscription {}", payloadId.toVersionless(), theSubscription.getIdElement(getContext()));
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
package ca.uhn.fhir.jpa.dao.r4;
|
||||
|
||||
import ca.uhn.fhir.jpa.entity.ResourceHistoryTable;
|
||||
import ca.uhn.fhir.jpa.entity.ResourceTable;
|
||||
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
|
||||
import ca.uhn.fhir.util.TestUtil;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.hl7.fhir.r4.model.*;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class FhirResourceDaoR4DeleteTest extends BaseJpaR4Test {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(FhirResourceDaoR4DeleteTest.class);
|
||||
|
||||
@Test
|
||||
public void testDeleteMarksResourceAndVersionAsDeleted() {
|
||||
|
||||
Patient p = new Patient();
|
||||
p.setActive(true);
|
||||
IIdType id = myPatientDao.create(p).getId().toUnqualifiedVersionless();
|
||||
|
||||
myPatientDao.delete(id);
|
||||
|
||||
// Table should be marked as deleted
|
||||
runInTransaction(()->{
|
||||
ResourceTable resourceTable = myResourceTableDao.findById(id.getIdPartAsLong()).get();
|
||||
assertNotNull(resourceTable.getDeleted());
|
||||
});
|
||||
|
||||
// Current version should be marked as deleted
|
||||
runInTransaction(()->{
|
||||
ResourceHistoryTable resourceTable = myResourceHistoryTableDao.findForIdAndVersion(id.getIdPartAsLong(), 1);
|
||||
assertNull(resourceTable.getDeleted());
|
||||
});
|
||||
runInTransaction(()->{
|
||||
ResourceHistoryTable resourceTable = myResourceHistoryTableDao.findForIdAndVersion(id.getIdPartAsLong(), 2);
|
||||
assertNotNull(resourceTable.getDeleted());
|
||||
});
|
||||
|
||||
try {
|
||||
myPatientDao.read(id.toUnqualifiedVersionless());
|
||||
fail();
|
||||
} catch (ResourceGoneException e) {
|
||||
// good
|
||||
}
|
||||
|
||||
myPatientDao.read(id.toUnqualifiedVersionless().withVersion("1"));
|
||||
|
||||
try {
|
||||
myPatientDao.read(id.toUnqualifiedVersionless().withVersion("2"));
|
||||
fail();
|
||||
} catch (ResourceGoneException e) {
|
||||
// good
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResourceIsConsideredDeletedIfOnlyResourceTableEntryIsDeleted() {
|
||||
|
||||
Patient p = new Patient();
|
||||
p.setActive(true);
|
||||
IIdType id = myPatientDao.create(p).getId().toUnqualifiedVersionless();
|
||||
|
||||
myPatientDao.delete(id);
|
||||
|
||||
// Table should be marked as deleted
|
||||
runInTransaction(()->{
|
||||
ResourceTable resourceTable = myResourceTableDao.findById(id.getIdPartAsLong()).get();
|
||||
assertNotNull(resourceTable.getDeleted());
|
||||
});
|
||||
|
||||
// Mark the current history version as not-deleted even though the actual resource
|
||||
// table entry is marked deleted
|
||||
runInTransaction(()->{
|
||||
ResourceHistoryTable resourceTable = myResourceHistoryTableDao.findForIdAndVersion(id.getIdPartAsLong(), 2);
|
||||
resourceTable.setDeleted(null);
|
||||
myResourceHistoryTableDao.save(resourceTable);
|
||||
});
|
||||
|
||||
try {
|
||||
myPatientDao.read(id.toUnqualifiedVersionless());
|
||||
fail();
|
||||
} catch (ResourceGoneException e) {
|
||||
// good
|
||||
}
|
||||
|
||||
myPatientDao.read(id.toUnqualifiedVersionless().withVersion("1"));
|
||||
|
||||
try {
|
||||
myPatientDao.read(id.toUnqualifiedVersionless().withVersion("2"));
|
||||
fail();
|
||||
} catch (ResourceGoneException e) {
|
||||
// good
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClassClearContext() {
|
||||
TestUtil.clearAllStaticFieldsForUnitTest();
|
||||
}
|
||||
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
package ca.uhn.fhir.jpa.dao.r4;
|
||||
|
||||
import ca.uhn.fhir.jpa.dao.DaoConfig;
|
||||
import ca.uhn.fhir.jpa.dao.ISearchParamRegistry;
|
||||
import ca.uhn.fhir.jpa.dao.SearchBuilder;
|
||||
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
|
||||
import ca.uhn.fhir.jpa.entity.ResourceIndexedCompositeStringUnique;
|
||||
|
@ -12,6 +13,7 @@ import ca.uhn.fhir.rest.param.TokenParam;
|
|||
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
|
||||
import ca.uhn.fhir.util.TestUtil;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.hl7.fhir.r4.model.*;
|
||||
import org.hl7.fhir.r4.model.Enumerations.PublicationStatus;
|
||||
|
@ -19,6 +21,7 @@ import org.junit.After;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
||||
|
@ -47,12 +50,14 @@ public class FhirResourceDaoR4UniqueSearchParamTest extends BaseJpaR4Test {
|
|||
myDaoConfig.setDefaultSearchParamsCanBeOverridden(new DaoConfig().isDefaultSearchParamsCanBeOverridden());
|
||||
myDaoConfig.setUniqueIndexesCheckedBeforeSave(new DaoConfig().isUniqueIndexesCheckedBeforeSave());
|
||||
myDaoConfig.setSchedulingDisabled(new DaoConfig().isSchedulingDisabled());
|
||||
myDaoConfig.setUniqueIndexesEnabled(new DaoConfig().isUniqueIndexesEnabled());
|
||||
}
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
myDaoConfig.setDefaultSearchParamsCanBeOverridden(true);
|
||||
myDaoConfig.setSchedulingDisabled(true);
|
||||
myDaoConfig.setUniqueIndexesEnabled(true);
|
||||
SearchBuilder.resetLastHandlerMechanismForUnitTest();
|
||||
}
|
||||
|
||||
|
@ -419,6 +424,10 @@ public class FhirResourceDaoR4UniqueSearchParamTest extends BaseJpaR4Test {
|
|||
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private ISearchParamRegistry mySearchParamRegistry;
|
||||
|
||||
|
||||
@Test
|
||||
public void testDuplicateUniqueValuesAreReIndexed() {
|
||||
myDaoConfig.setSchedulingDisabled(true);
|
||||
|
@ -449,6 +458,10 @@ public class FhirResourceDaoR4UniqueSearchParamTest extends BaseJpaR4Test {
|
|||
|
||||
createUniqueObservationSubjectDateCode();
|
||||
|
||||
List<JpaRuntimeSearchParam> uniqueSearchParams = mySearchParamRegistry.getActiveUniqueSearchParams("Observation");
|
||||
assertEquals(1, uniqueSearchParams.size());
|
||||
assertEquals(3, uniqueSearchParams.get(0).getComponents().size());
|
||||
|
||||
myResourceReindexingSvc.markAllResourcesForReindexing();
|
||||
myResourceReindexingSvc.forceReindexingPass();
|
||||
myResourceReindexingSvc.forceReindexingPass();
|
||||
|
|
|
@ -90,21 +90,52 @@ public class ResourceReindexingSvcImplTest extends BaseJpaTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testMarkJobsPastThresholdAsDeleted() {
|
||||
public void testReindexPassOnlyReturnsValuesAtLowThreshold() {
|
||||
mockNothingToExpunge();
|
||||
mockSingleReindexingJob(null);
|
||||
mockFourResourcesNeedReindexing();
|
||||
mockFetchFourResources();
|
||||
mockFinalResourceNeedsReindexing();
|
||||
|
||||
mySingleJob.setThresholdHigh(DateUtils.addMinutes(new Date(), -1));
|
||||
mySingleJob.setThresholdLow(new Date(40 * DateUtils.MILLIS_PER_DAY));
|
||||
Date highThreshold = DateUtils.addMinutes(new Date(), -1);
|
||||
mySingleJob.setThresholdHigh(highThreshold);
|
||||
|
||||
// Run the second pass, which should index no resources (meaning it's time to mark as deleted)
|
||||
mySvc.forceReindexingPass();
|
||||
|
||||
verify(myResourceTableDao, never()).findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(any(), any(), any());
|
||||
verify(myResourceTableDao, never()).findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(any(), any(), any(), any());
|
||||
verify(myReindexJobDao, times(1)).markAsDeletedById(myIdCaptor.capture());
|
||||
verify(myReindexJobDao, never()).markAsDeletedById(any());
|
||||
verify(myResourceTableDao, times(1)).findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(myPageRequestCaptor.capture(), myLowCaptor.capture(), myHighCaptor.capture());
|
||||
assertEquals(new Date(40 * DateUtils.MILLIS_PER_DAY), myLowCaptor.getAllValues().get(0));
|
||||
assertEquals(highThreshold, myHighCaptor.getAllValues().get(0));
|
||||
|
||||
assertEquals(123L, myIdCaptor.getValue().longValue());
|
||||
// Should mark the low threshold as 1 milli higher than the ne returned item
|
||||
verify(myReindexJobDao, times(1)).setThresholdLow(eq(123L), eq(new Date((40 * DateUtils.MILLIS_PER_DAY)+1L)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMarkAsDeletedIfNothingIndexed() {
|
||||
mockNothingToExpunge();
|
||||
mockSingleReindexingJob(null);
|
||||
mockFetchFourResources();
|
||||
// Mock resource fetch
|
||||
List<Long> values = Collections.emptyList();
|
||||
when(myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(any(),any(),any())).thenReturn(new SliceImpl<>(values));
|
||||
|
||||
mySingleJob.setThresholdLow(new Date(40 * DateUtils.MILLIS_PER_DAY));
|
||||
Date highThreshold = DateUtils.addMinutes(new Date(), -1);
|
||||
mySingleJob.setThresholdHigh(highThreshold);
|
||||
|
||||
// Run the second pass, which should index no resources (meaning it's time to mark as deleted)
|
||||
mySvc.forceReindexingPass();
|
||||
verify(myResourceTableDao, never()).findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(any(), any(), any(), any());
|
||||
verify(myResourceTableDao, times(1)).findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(myPageRequestCaptor.capture(), myLowCaptor.capture(), myHighCaptor.capture());
|
||||
assertEquals(new Date(40 * DateUtils.MILLIS_PER_DAY), myLowCaptor.getAllValues().get(0));
|
||||
assertEquals(highThreshold, myHighCaptor.getAllValues().get(0));
|
||||
|
||||
// This time we shouldn't update the threshold
|
||||
verify(myReindexJobDao, never()).setThresholdLow(any(),any());
|
||||
|
||||
verify(myReindexJobDao, times(1)).markAsDeletedById(eq(123L));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -243,7 +274,13 @@ public class ResourceReindexingSvcImplTest extends BaseJpaTest {
|
|||
private void mockFourResourcesNeedReindexing() {
|
||||
// Mock resource fetch
|
||||
List<Long> values = Arrays.asList(0L, 1L, 2L, 3L);
|
||||
when(myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(myPageRequestCaptor.capture(), myLowCaptor.capture(), myHighCaptor.capture())).thenReturn(new SliceImpl<>(values));
|
||||
when(myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(any(),any(),any())).thenReturn(new SliceImpl<>(values));
|
||||
}
|
||||
|
||||
private void mockFinalResourceNeedsReindexing() {
|
||||
// Mock resource fetch
|
||||
List<Long> values = Arrays.asList(2L); // the second-last one has the highest time
|
||||
when(myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(any(),any(),any())).thenReturn(new SliceImpl<>(values));
|
||||
}
|
||||
|
||||
private void mockSingleReindexingJob(String theResourceType) {
|
||||
|
|
|
@ -9,9 +9,9 @@ package ca.uhn.fhir.rest.server;
|
|||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -346,20 +346,6 @@ public class RestfulServer extends HttpServlet implements IRestfulServer<Servlet
|
|||
handleRequest(RequestTypeEnum.PUT, request, response);
|
||||
}
|
||||
|
||||
/**
|
||||
* Count length of URL string, but treating unescaped sequences (e.g. ' ') as their unescaped equivalent (%20)
|
||||
*/
|
||||
protected static int escapedLength(String theServletPath) {
|
||||
int delta = 0;
|
||||
for (int i = 0; i < theServletPath.length(); i++) {
|
||||
char next = theServletPath.charAt(i);
|
||||
if (next == ' ') {
|
||||
delta = delta + 2;
|
||||
}
|
||||
}
|
||||
return theServletPath.length() + delta;
|
||||
}
|
||||
|
||||
private void findResourceMethods(Object theProvider) {
|
||||
|
||||
ourLog.info("Scanning type for RESTful methods: {}", theProvider.getClass());
|
||||
|
@ -564,6 +550,18 @@ public class RestfulServer extends HttpServlet implements IRestfulServer<Servlet
|
|||
return Collections.unmodifiableList(myInterceptors);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets (or clears) the list of interceptors
|
||||
*
|
||||
* @param theList The list of interceptors (may be null)
|
||||
*/
|
||||
public void setInterceptors(List<IServerInterceptor> theList) {
|
||||
myInterceptors.clear();
|
||||
if (theList != null) {
|
||||
myInterceptors.addAll(theList);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets (or clears) the list of interceptors
|
||||
*
|
||||
|
@ -578,18 +576,6 @@ public class RestfulServer extends HttpServlet implements IRestfulServer<Servlet
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets (or clears) the list of interceptors
|
||||
*
|
||||
* @param theList The list of interceptors (may be null)
|
||||
*/
|
||||
public void setInterceptors(List<IServerInterceptor> theList) {
|
||||
myInterceptors.clear();
|
||||
if (theList != null) {
|
||||
myInterceptors.addAll(theList);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IPagingProvider getPagingProvider() {
|
||||
return myPagingProvider;
|
||||
|
@ -616,13 +602,8 @@ public class RestfulServer extends HttpServlet implements IRestfulServer<Servlet
|
|||
*
|
||||
* @see #setResourceProviders(Collection)
|
||||
*/
|
||||
public void setPlainProviders(Collection<Object> theProviders) {
|
||||
Validate.noNullElements(theProviders, "theProviders must not contain any null elements");
|
||||
|
||||
myPlainProviders.clear();
|
||||
if (theProviders != null) {
|
||||
myPlainProviders.addAll(theProviders);
|
||||
}
|
||||
public void setPlainProviders(Object... theProv) {
|
||||
setPlainProviders(Arrays.asList(theProv));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -630,8 +611,13 @@ public class RestfulServer extends HttpServlet implements IRestfulServer<Servlet
|
|||
*
|
||||
* @see #setResourceProviders(Collection)
|
||||
*/
|
||||
public void setPlainProviders(Object... theProv) {
|
||||
setPlainProviders(Arrays.asList(theProv));
|
||||
public void setPlainProviders(Collection<Object> theProviders) {
|
||||
Validate.noNullElements(theProviders, "theProviders must not contain any null elements");
|
||||
|
||||
myPlainProviders.clear();
|
||||
if (theProviders != null) {
|
||||
myPlainProviders.addAll(theProviders);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -643,7 +629,8 @@ public class RestfulServer extends HttpServlet implements IRestfulServer<Servlet
|
|||
* @param servletPath the servelet path
|
||||
* @return created resource path
|
||||
*/
|
||||
protected static String getRequestPath(String requestFullPath, String servletContextPath, String servletPath) {
|
||||
// NOTE: Don't make this a static method!! People want to override it
|
||||
protected String getRequestPath(String requestFullPath, String servletContextPath, String servletPath) {
|
||||
return requestFullPath.substring(escapedLength(servletContextPath) + escapedLength(servletPath));
|
||||
}
|
||||
|
||||
|
@ -661,22 +648,22 @@ public class RestfulServer extends HttpServlet implements IRestfulServer<Servlet
|
|||
/**
|
||||
* Sets the resource providers for this server
|
||||
*/
|
||||
public void setResourceProviders(Collection<IResourceProvider> theProviders) {
|
||||
Validate.noNullElements(theProviders, "theProviders must not contain any null elements");
|
||||
|
||||
public void setResourceProviders(IResourceProvider... theResourceProviders) {
|
||||
myResourceProviders.clear();
|
||||
if (theProviders != null) {
|
||||
myResourceProviders.addAll(theProviders);
|
||||
if (theResourceProviders != null) {
|
||||
myResourceProviders.addAll(Arrays.asList(theResourceProviders));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the resource providers for this server
|
||||
*/
|
||||
public void setResourceProviders(IResourceProvider... theResourceProviders) {
|
||||
public void setResourceProviders(Collection<IResourceProvider> theProviders) {
|
||||
Validate.noNullElements(theProviders, "theProviders must not contain any null elements");
|
||||
|
||||
myResourceProviders.clear();
|
||||
if (theResourceProviders != null) {
|
||||
myResourceProviders.addAll(Arrays.asList(theResourceProviders));
|
||||
if (theProviders != null) {
|
||||
myResourceProviders.addAll(theProviders);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1648,6 +1635,20 @@ public class RestfulServer extends HttpServlet implements IRestfulServer<Servlet
|
|||
theResponse.getWriter().write(theException.getMessage());
|
||||
}
|
||||
|
||||
/**
|
||||
* Count length of URL string, but treating unescaped sequences (e.g. ' ') as their unescaped equivalent (%20)
|
||||
*/
|
||||
protected static int escapedLength(String theServletPath) {
|
||||
int delta = 0;
|
||||
for (int i = 0; i < theServletPath.length(); i++) {
|
||||
char next = theServletPath.charAt(i);
|
||||
if (next == ' ') {
|
||||
delta = delta + 2;
|
||||
}
|
||||
}
|
||||
return theServletPath.length() + delta;
|
||||
}
|
||||
|
||||
public static void throwUnknownFhirOperationException(RequestDetails requestDetails, String requestPath, RequestTypeEnum theRequestType, FhirContext theFhirContext) {
|
||||
throw new InvalidRequestException(theFhirContext.getLocalizer().getMessage(RestfulServer.class, "unknownMethod", theRequestType.name(), requestPath, requestDetails.getParameters().keySet()));
|
||||
}
|
||||
|
|
|
@ -81,6 +81,11 @@
|
|||
three Subscriber classes into @Scope("protoype") so their dependencies can be @Autowired injected
|
||||
as opposed to constructor parameters.
|
||||
</action>
|
||||
<action type="fix">
|
||||
A bug in the JPA resource reindexer was fixed: In many cases the reindexer would
|
||||
mark reindexing jobs as deleted before they had actually completed, leading to
|
||||
some resources not actually being reindexed.
|
||||
</action>
|
||||
</release>
|
||||
<release version="3.6.0" date="2018-11-12" description="Food">
|
||||
<action type="add">
|
||||
|
|
Loading…
Reference in New Issue