Don't create a new version when reindexing resources

This commit is contained in:
jamesagnew 2015-11-04 08:26:33 -05:00
parent f13820b627
commit 4545b4fb7f
9 changed files with 81 additions and 37 deletions

View File

@ -842,6 +842,11 @@ public abstract class BaseHapiFhirResourceDao<T extends IResource> extends BaseH
ourLog.info("Processed remove tag {}/{} on {} in {}ms", new Object[] { theScheme, theTerm, theId.getValue(), w.getMillisAndRestart() });
}
@Override
public void reindex(T theResource, ResourceTable theEntity) {
updateEntity(theResource, theEntity, false, null, true, false, theEntity.getUpdatedDate());
}
@Override
public IBundleProvider search(Map<String, IQueryParameterType> theParams) {
SearchParameterMap map = new SearchParameterMap();

View File

@ -43,18 +43,17 @@ import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import ca.uhn.fhir.jpa.entity.ResourceTable;
import ca.uhn.fhir.jpa.util.ReindexFailureException;
import ca.uhn.fhir.jpa.util.StopWatch;
import ca.uhn.fhir.model.api.TagList;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.model.primitive.InstantDt;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.server.IBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor.ActionRequestDetails;
public abstract class BaseHapiFhirSystemDao<T> extends BaseHapiFhirDao<IBaseResource>implements IFhirSystemDao<T> {
public abstract class BaseHapiFhirSystemDao<T> extends BaseHapiFhirDao<IBaseResource> implements IFhirSystemDao<T> {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseHapiFhirSystemDao.class);
@ -97,36 +96,23 @@ public abstract class BaseHapiFhirSystemDao<T> extends BaseHapiFhirDao<IBaseReso
long start = System.currentTimeMillis();
for (ResourceTable resourceTable : resources) {
final IBaseResource resource;
try {
resource = toResource(resourceTable, false);
} catch (DataFormatException e) {
ourLog.warn("Failure parsing resource: {}", e.toString());
throw new UnprocessableEntityException(Long.toString(resourceTable.getId()));
}
@SuppressWarnings("rawtypes")
final IFhirResourceDao dao = getDao(resource.getClass());
if (dao == null) {
ourLog.warn("No DAO for type: {}", resource.getClass());
throw new UnprocessableEntityException(Long.toString(resourceTable.getId()));
}
final IBaseResource resource = toResource(resourceTable, false);
if (resource.getIdElement().isIdPartValid() == false) {
ourLog.warn("Not going to try and index an invalid ID: {}", resource.getIdElement());
throw new UnprocessableEntityException(Long.toString(resourceTable.getId()));
}
@SuppressWarnings("rawtypes")
final IFhirResourceDao dao = getDao(resource.getClass());
try {
dao.update(resource, null, true);
dao.reindex(resource, resourceTable);
} catch (Exception e) {
ourLog.error("Failed to index resource {}: {}", new Object[] { resource.getIdElement(), e.toString(), e });
throw new UnprocessableEntityException(Long.toString(resourceTable.getId()));
ourLog.error("Failed to index resource {}: {}", new Object[] { resourceTable.getIdDt(), e.toString(), e });
throw new ReindexFailureException(resourceTable.getId());
}
count++;
}
long delay = System.currentTimeMillis() - start;
ourLog.info("Indexed {} / {} resources in {}ms", new Object[] { count, resources.size(), delay });
long avg = (delay / resources.size());
ourLog.info("Indexed {} / {} resources in {}ms - Avg {}ms / resource", new Object[] { count, resources.size(), delay, avg });
return resources.size();
}
@ -207,12 +193,13 @@ public abstract class BaseHapiFhirSystemDao<T> extends BaseHapiFhirDao<IBaseReso
}
@Override
@Transactional(propagation=Propagation.NOT_SUPPORTED)
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public int performReindexingPass(final Integer theCount) {
try {
return doPerformReindexingPass(theCount);
} catch (UnprocessableEntityException e) {
markResourceAsIndexingFailed(Long.parseLong(e.getMessage()));
} catch (ReindexFailureException e) {
ourLog.warn("Reindexing failed for resource {}", e.getResourceId());
markResourceAsIndexingFailed(e.getResourceId());
return -1;
}
}

View File

@ -46,8 +46,13 @@ public class FhirResourceDaoSearchParameterDstu2 extends FhirResourceDaoDstu2<Se
}
int count = mySystemDao.performReindexingPass(100);
for (int i = 0; i < 10 && count > 0; i++) {
count = mySystemDao.performReindexingPass(100);
for (int i = 0; i < 50 && count > 0; i++) {
count = mySystemDao.performReindexingPass(100);
try {
Thread.sleep(DateUtils.MILLIS_PER_SECOND);
} catch (InterruptedException e) {
break;
}
}
}

View File

@ -30,6 +30,7 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import ca.uhn.fhir.jpa.entity.BaseHasResource;
import ca.uhn.fhir.jpa.entity.ResourceTable;
import ca.uhn.fhir.jpa.entity.TagTypeEnum;
import ca.uhn.fhir.model.api.IQueryParameterType;
import ca.uhn.fhir.model.api.TagList;
@ -60,6 +61,11 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
DaoMethodOutcome deleteByUrl(String theString);
/**
* @param theTransaction Is this being called in a bundle? If so, don't throw an exception if no matches
*/
DaoMethodOutcome deleteByUrl(String theUrl, boolean theTransaction);
TagList getAllResourceTags();
Class<T> getResourceType();
@ -112,6 +118,12 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
*/
BaseHasResource readEntity(IIdType theId, boolean theCheckForForcedId);
/**
* Updates index tables associated with the given resource. Does not create a new
* version or update the resource's update time.
*/
void reindex(T theResource, ResourceTable theEntity);
void removeTag(IIdType theId, TagTypeEnum theTagType, String theScheme, String theTerm);
IBundleProvider search(Map<String, IQueryParameterType> theParams);
@ -142,11 +154,6 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
*/
MethodOutcome validate(T theResource, IIdType theId, String theRawResource, EncodingEnum theEncoding, ValidationModeEnum theMode, String theProfile);
/**
* @param theTransaction Is this being called in a bundle? If so, don't throw an exception if no matches
*/
DaoMethodOutcome deleteByUrl(String theUrl, boolean theTransaction);
// /**
// * Invoke the everything operation
// */

View File

@ -121,6 +121,10 @@ public abstract class BaseHasResource {
return new InstantDt(myUpdated);
}
public Date getUpdatedDate() {
return myUpdated;
}
public abstract long getVersion();
public boolean isHasTags() {

View File

@ -0,0 +1,16 @@
package ca.uhn.fhir.jpa.util;
public class ReindexFailureException extends RuntimeException {
private static final long serialVersionUID = 1L;
private Long myResourceId;
public ReindexFailureException(Long theResourceId) {
myResourceId = theResourceId;
}
public Long getResourceId() {
return myResourceId;
}
}

View File

@ -22,7 +22,6 @@ import javax.persistence.TypedQuery;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
@ -35,6 +34,7 @@ import ca.uhn.fhir.model.dstu2.resource.Subscription;
import ca.uhn.fhir.model.dstu2.valueset.ObservationStatusEnum;
import ca.uhn.fhir.model.dstu2.valueset.SubscriptionChannelTypeEnum;
import ca.uhn.fhir.model.dstu2.valueset.SubscriptionStatusEnum;
import ca.uhn.fhir.rest.server.IBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
@ -375,12 +375,30 @@ public class FhirResourceDaoDstu2SubscriptionTest extends BaseJpaDstu2Test {
resultIds = toUnqualifiedVersionlessIds(results);
assertThat(resultIds, empty());
/*
* Make sure that reindexing doesn't trigger
*/
mySystemDao.markAllResourcesForReindexing();
mySystemDao.performReindexingPass(100);
assertEquals(0, mySubscriptionDao.pollForNewUndeliveredResources());
/*
* Update resources on disk
*/
IBundleProvider allObs = myObservationDao.search(new SearchParameterMap());
ourLog.info("Updating {} observations", allObs.size());
for (IBaseResource next : allObs.getResources(0, allObs.size())) {
ourLog.info("Updating observation");
Observation nextObs = (Observation) next;
nextObs.addPerformer().setDisplay("Some display");
myObservationDao.update(nextObs);
}
assertEquals(6, mySubscriptionDao.pollForNewUndeliveredResources());
assertEquals(0, mySubscriptionDao.pollForNewUndeliveredResources());
}

View File

@ -70,7 +70,7 @@ public class FhirSystemDaoDstu2Test extends BaseJpaDstu2SystemTest {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirSystemDaoDstu2Test.class);
@Test
public void testRendexing() {
public void testReindexing() {
Patient p = new Patient();
p.addName().addFamily("family");
final IIdType id = myPatientDao.create(p).getId().toUnqualifiedVersionless();

View File

@ -1,5 +1,6 @@
package ca.uhn.fhirtest.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor;
@ -11,6 +12,7 @@ public class CommonConfig {
/**
* Do some fancy logging to create a nice access log that has details about each incoming request.
*/
@Bean
public IServerInterceptor loggingInterceptor() {
LoggingInterceptor retVal = new LoggingInterceptor();
retVal.setLoggerName("fhirtest.access");