Fix resource reindexing on partitioned seerver (#2468)
* Fix resource reindexing on partitioned seerver * Add changelog * Test fix * Test fix * Tets fix * Test fix
This commit is contained in:
parent
2633cbd141
commit
0ad57c51bb
|
@ -107,6 +107,7 @@ ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDao.successfulUpdate=Successfully update
|
||||||
ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDao.successfulDeletes=Successfully deleted {0} resource(s) in {1}ms
|
ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDao.successfulDeletes=Successfully deleted {0} resource(s) in {1}ms
|
||||||
ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDao.invalidSearchParameter=Unknown search parameter "{0}" for resource type "{1}". Valid search parameters for this search are: {2}
|
ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDao.invalidSearchParameter=Unknown search parameter "{0}" for resource type "{1}". Valid search parameters for this search are: {2}
|
||||||
ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDao.invalidSortParameter=Unknown _sort parameter value "{0}" for resource type "{1}" (Note: sort parameters values must use a valid Search Parameter). Valid values for this search are: {2}
|
ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDao.invalidSortParameter=Unknown _sort parameter value "{0}" for resource type "{1}" (Note: sort parameters values must use a valid Search Parameter). Valid values for this search are: {2}
|
||||||
|
ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDao.updateWithNoId=Can not update resource of type {0} as it has no ID
|
||||||
|
|
||||||
ca.uhn.fhir.jpa.dao.BaseStorageDao.invalidBundleTypeForStorage=Unable to store a Bundle resource on this server with a Bundle.type value of: {0}. Note that if you are trying to perform a FHIR 'transaction' or 'batch' operation you should POST the Bundle resource to the Base URL of the server, not to the '/Bundle' endpoint.
|
ca.uhn.fhir.jpa.dao.BaseStorageDao.invalidBundleTypeForStorage=Unable to store a Bundle resource on this server with a Bundle.type value of: {0}. Note that if you are trying to perform a FHIR 'transaction' or 'batch' operation you should POST the Bundle resource to the Base URL of the server, not to the '/Bundle' endpoint.
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
---
|
||||||
|
type: fix
|
||||||
|
issue: 2468
|
||||||
|
title: "Running a manual reindex of data failed on a partitioned server with an interceptor error. This has been
|
||||||
|
corrected. Thanks to Ajay Shekar for reporting!"
|
|
@ -167,6 +167,13 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
|
||||||
*/
|
*/
|
||||||
T readByPid(ResourcePersistentId thePid);
|
T readByPid(ResourcePersistentId thePid);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read a resource by its internal PID
|
||||||
|
*/
|
||||||
|
default T readByPid(ResourcePersistentId thePid, boolean theDeletedOk) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param theRequestDetails TODO
|
* @param theRequestDetails TODO
|
||||||
* @throws ResourceNotFoundException If the ID is not known to the server
|
* @throws ResourceNotFoundException If the ID is not known to the server
|
||||||
|
|
|
@ -137,6 +137,7 @@ import java.util.UUID;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.commons.lang3.StringUtils.defaultString;
|
import static org.apache.commons.lang3.StringUtils.defaultString;
|
||||||
|
import static org.apache.commons.lang3.StringUtils.isBlank;
|
||||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||||
|
|
||||||
public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends BaseHapiFhirDao<T> implements IFhirResourceDao<T> {
|
public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends BaseHapiFhirDao<T> implements IFhirResourceDao<T> {
|
||||||
|
@ -1055,13 +1056,19 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
|
||||||
@Override
|
@Override
|
||||||
@Transactional
|
@Transactional
|
||||||
public T readByPid(ResourcePersistentId thePid) {
|
public T readByPid(ResourcePersistentId thePid) {
|
||||||
|
return readByPid(thePid, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Transactional
|
||||||
|
public T readByPid(ResourcePersistentId thePid, boolean theDeletedOk) {
|
||||||
StopWatch w = new StopWatch();
|
StopWatch w = new StopWatch();
|
||||||
|
|
||||||
Optional<ResourceTable> entity = myResourceTableDao.findById(thePid.getIdAsLong());
|
Optional<ResourceTable> entity = myResourceTableDao.findById(thePid.getIdAsLong());
|
||||||
if (!entity.isPresent()) {
|
if (!entity.isPresent()) {
|
||||||
throw new ResourceNotFoundException("No resource found with PID " + thePid);
|
throw new ResourceNotFoundException("No resource found with PID " + thePid);
|
||||||
}
|
}
|
||||||
if (entity.get().getDeleted() != null) {
|
if (entity.get().getDeleted() != null && !theDeletedOk) {
|
||||||
throw createResourceGoneException(entity.get());
|
throw createResourceGoneException(entity.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1495,7 +1502,11 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
|
||||||
String msg = getContext().getLocalizer().getMessage(BaseHapiFhirResourceDao.class, "missingBody");
|
String msg = getContext().getLocalizer().getMessage(BaseHapiFhirResourceDao.class, "missingBody");
|
||||||
throw new InvalidRequestException(msg);
|
throw new InvalidRequestException(msg);
|
||||||
}
|
}
|
||||||
assert theResource.getIdElement().hasIdPart() || isNotBlank(theMatchUrl);
|
if (!theResource.getIdElement().hasIdPart() && isBlank(theMatchUrl)) {
|
||||||
|
String type = myFhirContext.getResourceType(theResource);
|
||||||
|
String msg = myFhirContext.getLocalizer().getMessage(BaseHapiFhirResourceDao.class, "updateWithNoId", type);
|
||||||
|
throw new InvalidRequestException(msg);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Resource updates will modify/update the version of the resource with the new version. This is generally helpful,
|
* Resource updates will modify/update the version of the resource with the new version. This is generally helpful,
|
||||||
|
|
|
@ -38,6 +38,7 @@ import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
|
||||||
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
|
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
|
||||||
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
|
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
|
||||||
import ca.uhn.fhir.parser.DataFormatException;
|
import ca.uhn.fhir.parser.DataFormatException;
|
||||||
|
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||||
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
|
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
|
||||||
|
@ -560,7 +561,8 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
|
||||||
|
|
||||||
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceTable.getResourceType());
|
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceTable.getResourceType());
|
||||||
long expectedVersion = resourceTable.getVersion();
|
long expectedVersion = resourceTable.getVersion();
|
||||||
IBaseResource resource = dao.read(resourceTable.getIdDt().toVersionless(), null, true);
|
IBaseResource resource = dao.readByPid(new ResourcePersistentId(resourceTable.getId()), true);
|
||||||
|
|
||||||
if (resource == null) {
|
if (resource == null) {
|
||||||
throw new InternalErrorException("Could not find resource version " + resourceTable.getIdDt().toUnqualified().getValue() + " in database");
|
throw new InternalErrorException("Could not find resource version " + resourceTable.getIdDt().toUnqualified().getValue() + " in database");
|
||||||
}
|
}
|
||||||
|
|
|
@ -586,6 +586,17 @@ public class FhirResourceDaoR4UpdateTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateWithoutId() {
|
||||||
|
|
||||||
|
Patient p = new Patient();
|
||||||
|
try {
|
||||||
|
myPatientDao.update(p);
|
||||||
|
} catch (InvalidRequestException e) {
|
||||||
|
assertEquals("Can not update resource of type Patient as it has no ID", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUpdateConditionalByLastUpdatedWithWrongTimezone() throws Exception {
|
public void testUpdateConditionalByLastUpdatedWithWrongTimezone() throws Exception {
|
||||||
TimeZone def = TimeZone.getDefault();
|
TimeZone def = TimeZone.getDefault();
|
||||||
|
|
|
@ -5,6 +5,7 @@ import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
|
||||||
import ca.uhn.fhir.interceptor.api.Pointcut;
|
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||||
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
||||||
import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
||||||
|
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
|
||||||
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
|
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
|
||||||
import ca.uhn.fhir.jpa.model.entity.ForcedId;
|
import ca.uhn.fhir.jpa.model.entity.ForcedId;
|
||||||
import ca.uhn.fhir.jpa.model.entity.PartitionablePartitionId;
|
import ca.uhn.fhir.jpa.model.entity.PartitionablePartitionId;
|
||||||
|
@ -68,6 +69,7 @@ import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.matchesPattern;
|
import static org.hamcrest.Matchers.matchesPattern;
|
||||||
import static org.hamcrest.Matchers.startsWith;
|
import static org.hamcrest.Matchers.startsWith;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
import static org.junit.jupiter.api.Assertions.fail;
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
@ -2905,6 +2907,20 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test {
|
||||||
assertEquals(0, countMatches(sql, "PARTITION_ID IS NULL"), sql);
|
assertEquals(0, countMatches(sql, "PARTITION_ID IS NULL"), sql);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReindexPartitionedServer() {
|
||||||
|
IIdType patientIdNull = createPatient(withPartition(null), withActiveTrue());
|
||||||
|
IIdType patientId1 = createPatient(withPartition(1), withActiveTrue());
|
||||||
|
|
||||||
|
myResourceReindexingSvc.markAllResourcesForReindexing();
|
||||||
|
myResourceReindexingSvc.forceReindexingPass();
|
||||||
|
|
||||||
|
runInTransaction(()->{
|
||||||
|
assertNotEquals(BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED, myResourceTableDao.findById(patientIdNull.getIdPartAsLong()).get().getIndexStatus());
|
||||||
|
assertNotEquals(BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED, myResourceTableDao.findById(patientId1.getIdPartAsLong()).get().getIndexStatus());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPartitionNotify() {
|
public void testPartitionNotify() {
|
||||||
IAnonymousInterceptor interceptor = mock(IAnonymousInterceptor.class);
|
IAnonymousInterceptor interceptor = mock(IAnonymousInterceptor.class);
|
||||||
|
|
|
@ -4,6 +4,7 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
||||||
import ca.uhn.fhir.jpa.config.BaseConfig;
|
import ca.uhn.fhir.jpa.config.BaseConfig;
|
||||||
import ca.uhn.fhir.jpa.config.TestR4Config;
|
import ca.uhn.fhir.jpa.config.TestR4Config;
|
||||||
import ca.uhn.fhir.jpa.entity.Search;
|
import ca.uhn.fhir.jpa.entity.Search;
|
||||||
|
import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
|
||||||
import ca.uhn.fhir.rest.api.Constants;
|
import ca.uhn.fhir.rest.api.Constants;
|
||||||
import ca.uhn.fhir.rest.api.PreferReturnEnum;
|
import ca.uhn.fhir.rest.api.PreferReturnEnum;
|
||||||
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
|
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
|
||||||
|
@ -20,8 +21,10 @@ import ca.uhn.fhir.rest.server.interceptor.consent.DelegatingConsentService;
|
||||||
import ca.uhn.fhir.rest.server.interceptor.consent.IConsentContextServices;
|
import ca.uhn.fhir.rest.server.interceptor.consent.IConsentContextServices;
|
||||||
import ca.uhn.fhir.rest.server.interceptor.consent.IConsentService;
|
import ca.uhn.fhir.rest.server.interceptor.consent.IConsentService;
|
||||||
import ca.uhn.fhir.util.BundleUtil;
|
import ca.uhn.fhir.util.BundleUtil;
|
||||||
|
import ca.uhn.fhir.util.StopWatch;
|
||||||
import ca.uhn.fhir.util.UrlUtil;
|
import ca.uhn.fhir.util.UrlUtil;
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
|
import com.google.common.base.Stopwatch;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.commons.collections4.ListUtils;
|
import org.apache.commons.collections4.ListUtils;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
@ -551,6 +554,10 @@ public class ConsentInterceptorResourceProviderR4Test extends BaseResourceProvid
|
||||||
myClient.create().resource(new Patient().setGender(Enumerations.AdministrativeGender.MALE).addName(new HumanName().setFamily("2"))).execute();
|
myClient.create().resource(new Patient().setGender(Enumerations.AdministrativeGender.MALE).addName(new HumanName().setFamily("2"))).execute();
|
||||||
myClient.create().resource(new Patient().setGender(Enumerations.AdministrativeGender.FEMALE).addName(new HumanName().setFamily("3"))).execute();
|
myClient.create().resource(new Patient().setGender(Enumerations.AdministrativeGender.FEMALE).addName(new HumanName().setFamily("3"))).execute();
|
||||||
|
|
||||||
|
runInTransaction(()->{
|
||||||
|
assertEquals(3, myResourceTableDao.count());
|
||||||
|
});
|
||||||
|
|
||||||
Bundle response = myClient.search().forResource(Patient.class).count(1).returnBundle(Bundle.class).execute();
|
Bundle response = myClient.search().forResource(Patient.class).count(1).returnBundle(Bundle.class).execute();
|
||||||
String searchId = response.getId();
|
String searchId = response.getId();
|
||||||
|
|
||||||
|
@ -563,6 +570,20 @@ public class ConsentInterceptorResourceProviderR4Test extends BaseResourceProvid
|
||||||
assertEquals(1, response.getEntry().size());
|
assertEquals(1, response.getEntry().size());
|
||||||
assertNull(response.getTotalElement().getValue());
|
assertNull(response.getTotalElement().getValue());
|
||||||
|
|
||||||
|
StopWatch sw = new StopWatch();
|
||||||
|
while(true) {
|
||||||
|
SearchStatusEnum status = runInTransaction(() -> {
|
||||||
|
Search search = mySearchEntityDao.findByUuidAndFetchIncludes(searchId).orElseThrow(() -> new IllegalStateException());
|
||||||
|
return search.getStatus();
|
||||||
|
});
|
||||||
|
if (status == SearchStatusEnum.FINISHED) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (sw.getMillis() > 60000) {
|
||||||
|
fail("Status is still " + status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
runInTransaction(() -> {
|
runInTransaction(() -> {
|
||||||
Search search = mySearchEntityDao.findByUuidAndFetchIncludes(searchId).orElseThrow(() -> new IllegalStateException());
|
Search search = mySearchEntityDao.findByUuidAndFetchIncludes(searchId).orElseThrow(() -> new IllegalStateException());
|
||||||
assertEquals(3, search.getNumFound());
|
assertEquals(3, search.getNumFound());
|
||||||
|
|
|
@ -14,6 +14,7 @@ import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity;
|
||||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||||
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
|
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
|
||||||
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
|
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
|
||||||
|
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
||||||
import org.apache.commons.lang3.time.DateUtils;
|
import org.apache.commons.lang3.time.DateUtils;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
import org.hl7.fhir.instance.model.api.IIdType;
|
import org.hl7.fhir.instance.model.api.IIdType;
|
||||||
|
@ -250,9 +251,9 @@ public class ResourceReindexingSvcImplTest extends BaseJpaTest {
|
||||||
when(myDaoRegistry.getResourceDao(eq(Patient.class))).thenReturn(myResourceDao);
|
when(myDaoRegistry.getResourceDao(eq(Patient.class))).thenReturn(myResourceDao);
|
||||||
when(myDaoRegistry.getResourceDao(eq("Observation"))).thenReturn(myResourceDao);
|
when(myDaoRegistry.getResourceDao(eq("Observation"))).thenReturn(myResourceDao);
|
||||||
when(myDaoRegistry.getResourceDao(eq(Observation.class))).thenReturn(myResourceDao);
|
when(myDaoRegistry.getResourceDao(eq(Observation.class))).thenReturn(myResourceDao);
|
||||||
when(myResourceDao.read(any(), any(), anyBoolean())).thenAnswer(t->{
|
when(myResourceDao.readByPid(any(), anyBoolean())).thenAnswer(t->{
|
||||||
IIdType id = (IIdType) t.getArguments()[0];
|
int idx = t.getArgument(0, ResourcePersistentId.class).getIdAsLong().intValue();
|
||||||
return resources.get(id.getIdPartAsLong().intValue());
|
return resources.get(idx);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
|
@ -297,7 +298,7 @@ public class ResourceReindexingSvcImplTest extends BaseJpaTest {
|
||||||
when(myDaoRegistry.getResourceDao(eq(Patient.class))).thenReturn(myResourceDao);
|
when(myDaoRegistry.getResourceDao(eq(Patient.class))).thenReturn(myResourceDao);
|
||||||
when(myDaoRegistry.getResourceDao(eq("Observation"))).thenReturn(myResourceDao);
|
when(myDaoRegistry.getResourceDao(eq("Observation"))).thenReturn(myResourceDao);
|
||||||
when(myDaoRegistry.getResourceDao(eq(Observation.class))).thenReturn(myResourceDao);
|
when(myDaoRegistry.getResourceDao(eq(Observation.class))).thenReturn(myResourceDao);
|
||||||
when(myResourceDao.read(any(), any(), anyBoolean())).thenReturn(null);
|
when(myResourceDao.readByPid(any(), anyBoolean())).thenReturn(null);
|
||||||
|
|
||||||
|
|
||||||
int count = mySvc.forceReindexingPass();
|
int count = mySvc.forceReindexingPass();
|
||||||
|
@ -360,9 +361,9 @@ public class ResourceReindexingSvcImplTest extends BaseJpaTest {
|
||||||
when(myDaoRegistry.getResourceDao(eq(Patient.class))).thenReturn(myResourceDao);
|
when(myDaoRegistry.getResourceDao(eq(Patient.class))).thenReturn(myResourceDao);
|
||||||
when(myDaoRegistry.getResourceDao(eq("Observation"))).thenReturn(myResourceDao);
|
when(myDaoRegistry.getResourceDao(eq("Observation"))).thenReturn(myResourceDao);
|
||||||
when(myDaoRegistry.getResourceDao(eq(Observation.class))).thenReturn(myResourceDao);
|
when(myDaoRegistry.getResourceDao(eq(Observation.class))).thenReturn(myResourceDao);
|
||||||
when(myResourceDao.read(any(), any(), anyBoolean())).thenAnswer(t->{
|
when(myResourceDao.readByPid(any(), anyBoolean())).thenAnswer(t->{
|
||||||
IIdType id = (IIdType) t.getArguments()[0];
|
int idx = t.getArgument(0, ResourcePersistentId.class).getIdAsLong().intValue();
|
||||||
return resources.get(id.getIdPartAsLong().intValue());
|
return resources.get(idx);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue