Cleanup and some tests (#5609)

Cleanup warnings, and add some missing tests.
This commit is contained in:
Michael Buckley 2024-01-19 12:42:50 -05:00 committed by GitHub
parent bb99d3bf9a
commit 6a370c60cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 152 additions and 68 deletions

View File

@ -38,6 +38,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
@ -134,6 +135,9 @@ public class RequestPartitionId implements IModelJson {
if (hasPartitionNames()) {
b.append("names", getPartitionNames());
}
if (myAllPartitions) {
b.append("allPartitions", myAllPartitions);
}
return b.build();
}
@ -216,7 +220,7 @@ public class RequestPartitionId implements IModelJson {
}
public List<Integer> getPartitionIdsWithoutDefault() {
return getPartitionIds().stream().filter(t -> t != null).collect(Collectors.toList());
return getPartitionIds().stream().filter(Objects::nonNull).collect(Collectors.toList());
}
@Nullable

View File

@ -20,10 +20,13 @@ package ca.uhn.fhir.util;
* #L%
*/
import jakarta.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Stream;
/**
* This utility takes an input collection, breaks it up into chunks of a
@ -49,4 +52,9 @@ public class TaskChunker<T> {
theBatchConsumer.accept(batch);
}
}
@Nonnull
public <T> Stream<List<T>> chunk(Stream<T> theStream, int theChunkSize) {
return StreamUtil.partition(theStream, theChunkSize);
}
}

View File

@ -1540,7 +1540,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
.withRequest(theRequest)
.withTransactionDetails(transactionDetails)
.withRequestPartitionId(requestPartitionId)
.execute(() -> doReadInTransaction(theId, theRequest, theDeletedOk, requestPartitionId));
.read(() -> doReadInTransaction(theId, theRequest, theDeletedOk, requestPartitionId));
}
private T doReadInTransaction(

View File

@ -158,7 +158,8 @@ public class IdHelperService implements IIdHelperService<JpaPid> {
String theResourceId,
boolean theExcludeDeleted)
throws ResourceNotFoundException {
assert myDontCheckActiveTransactionForUnitTest || TransactionSynchronizationManager.isSynchronizationActive();
assert myDontCheckActiveTransactionForUnitTest || TransactionSynchronizationManager.isSynchronizationActive()
: "no transaction active";
if (theResourceId.contains("/")) {
theResourceId = theResourceId.substring(theResourceId.indexOf("/") + 1);

View File

@ -26,6 +26,9 @@ import jakarta.persistence.Id;
import jakarta.persistence.Table;
import jakarta.persistence.UniqueConstraint;
import java.util.ArrayList;
import java.util.List;
@Entity
@Table(
name = "HFJ_PARTITION",
@ -82,4 +85,20 @@ public class PartitionEntity {
public RequestPartitionId toRequestPartitionId() {
return RequestPartitionId.fromPartitionIdAndName(getId(), getName());
}
/**
* Build a RequestPartitionId from the ids and names in the entities.
* @param thePartitions the entities to use for ids and names
* @return a single RequestPartitionId covering all the entities
*/
public static RequestPartitionId buildRequestPartitionId(List<PartitionEntity> thePartitions) {
List<Integer> ids = new ArrayList<>(thePartitions.size());
List<String> names = new ArrayList<>(thePartitions.size());
for (PartitionEntity nextPartition : thePartitions) {
ids.add(nextPartition.getId());
names.add(nextPartition.getName());
}
return RequestPartitionId.forPartitionIdsAndNames(names, ids, null);
}
}

View File

@ -25,6 +25,7 @@ import ca.uhn.fhir.util.TaskChunker;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Stream;
/**
* As always, Oracle can't handle things that other databases don't mind.. In this
@ -37,4 +38,8 @@ public class QueryChunker<T> extends TaskChunker<T> {
public void chunk(Collection<T> theInput, Consumer<List<T>> theBatchConsumer) {
chunk(theInput, SearchBuilder.getMaximumPageSize(), theBatchConsumer);
}
public Stream<List<T>> chunk(Stream<T> theStream) {
return chunk(theStream, SearchBuilder.getMaximumPageSize());
}
}

View File

@ -0,0 +1,31 @@
package ca.uhn.fhir.jpa.entity;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
class PartitionEntityTest {
@Test
void buildRequestPartitionFromList() {
// given
List<PartitionEntity> entities = List.of(
new PartitionEntity().setId(1).setName("p1"),
new PartitionEntity().setId(2).setName("p2")
);
// when
RequestPartitionId requestPartitionId = PartitionEntity.buildRequestPartitionId(entities);
// then
assertThat(requestPartitionId, notNullValue());
assertThat(requestPartitionId.getPartitionIds(), equalTo(List.of(1,2)));
assertThat(requestPartitionId.getPartitionNames(), equalTo(List.of("p1","p2")));
}
}

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.rest.param.TokenOrListParam;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import jakarta.annotation.Nonnull;
import org.apache.commons.lang3.StringUtils;
@ -49,6 +50,9 @@ public class SubscriptionLoader extends BaseResourceCacheSynchronizer {
@Autowired
private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
@Autowired
protected ISearchParamRegistry mySearchParamRegistry;
/**
* Constructor
*/
@ -157,11 +161,11 @@ public class SubscriptionLoader extends BaseResourceCacheSynchronizer {
} else {
error = "";
}
ourLog.error("Subscription "
+ theSubscription.getIdElement().getIdPart()
+ " could not be activated."
+ " This will not prevent startup, but it could lead to undesirable outcomes! "
+ (StringUtils.isBlank(error) ? "" : "Error: " + error));
ourLog.error(
"Subscription {} could not be activated."
+ " This will not prevent startup, but it could lead to undesirable outcomes! {}",
theSubscription.getIdElement().getIdPart(),
(StringUtils.isBlank(error) ? "" : "Error: " + error));
}
public void syncSubscriptions() {

View File

@ -25,6 +25,7 @@ import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import ca.uhn.fhir.util.Logs;
import jakarta.annotation.Nonnull;
@ -47,6 +48,9 @@ public class SubscriptionTopicLoader extends BaseResourceCacheSynchronizer {
@Autowired
private SubscriptionTopicRegistry mySubscriptionTopicRegistry;
@Autowired
protected ISearchParamRegistry mySearchParamRegistry;
/**
* Constructor
*/

View File

@ -13,27 +13,24 @@ import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import ca.uhn.test.util.LogbackCaptureTestExtension;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasItem;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
@ -43,13 +40,11 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class SubscriptionLoaderTest {
private Logger ourLogger;
@Spy
private FhirContext myFhirContext = FhirContext.forR4Cached();
@Mock
private Appender<ILoggingEvent> myAppender;
@RegisterExtension
LogbackCaptureTestExtension myLogCapture = new LogbackCaptureTestExtension(SubscriptionLoader.class);
@Mock
private SubscriptionRegistry mySubscriptionRegistry;
@ -81,15 +76,8 @@ public class SubscriptionLoaderTest {
@InjectMocks
private SubscriptionLoader mySubscriptionLoader;
private Level myStoredLogLevel;
@BeforeEach
public void init() {
ourLogger = (Logger) LoggerFactory.getLogger(SubscriptionLoader.class);
myStoredLogLevel = ourLogger.getLevel();
ourLogger.addAppender(myAppender);
when(myResourceChangeListenerRegistry.registerResourceResourceChangeListener(
anyString(),
any(SearchParameterMap.class),
@ -102,12 +90,6 @@ public class SubscriptionLoaderTest {
mySubscriptionLoader.registerListener();
}
@AfterEach
public void end() {
ourLogger.detachAppender(myAppender);
ourLogger.setLevel(myStoredLogLevel);
}
private IBundleProvider getSubscriptionList(List<IBaseResource> theReturnedResource) {
IBundleProvider subscriptionList = new SimpleBundleProvider(theReturnedResource);
@ -122,26 +104,24 @@ public class SubscriptionLoaderTest {
subscription.setId("Subscription/123");
subscription.setError("THIS IS AN ERROR");
ourLogger.setLevel(Level.ERROR);
// when
when(myDaoRegistry.getResourceDao("Subscription"))
.thenReturn(mySubscriptionDao);
.thenReturn(mySubscriptionDao);
when(myDaoRegistry.isResourceTypeSupported("Subscription"))
.thenReturn(true);
.thenReturn(true);
when(mySubscriptionDao.search(any(SearchParameterMap.class), any(SystemRequestDetails.class)))
.thenReturn(getSubscriptionList(
Collections.singletonList(subscription)
));
when(mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(any(IBaseResource.class)))
.thenReturn(false);
.thenReturn(false);
when(mySubscriptionActivatingInterceptor.isChannelTypeSupported(any(IBaseResource.class)))
.thenReturn(true);
when(mySubscriptionCanonicalizer.getSubscriptionStatus(any())).thenReturn(SubscriptionConstants.REQUESTED_STATUS);
// test
mySubscriptionLoader.syncDatabaseToCache();
@ -149,15 +129,10 @@ public class SubscriptionLoaderTest {
verify(mySubscriptionDao)
.search(any(SearchParameterMap.class), any(SystemRequestDetails.class));
ArgumentCaptor<ILoggingEvent> eventCaptor = ArgumentCaptor.forClass(ILoggingEvent.class);
verify(myAppender).doAppend(eventCaptor.capture());
String actual = "Subscription "
String expected = "Subscription "
+ subscription.getIdElement().getIdPart()
+ " could not be activated.";
String msg = eventCaptor.getValue().getMessage();
Assertions.assertTrue(msg
.contains(actual),
String.format("Expected %s, actual %s", msg, actual));
Assertions.assertTrue(msg.contains(subscription.getError()));
assertThat(myLogCapture.getLogEvents(), hasItem(LogbackCaptureTestExtension.eventWithLevelAndMessageContains(Level.ERROR, expected)));
assertThat(myLogCapture.getLogEvents(), hasItem(LogbackCaptureTestExtension.eventWithMessageContains(subscription.getError())));
}
}

View File

@ -44,6 +44,7 @@ public class ReindexStepTest {
ReindexJobParameters reindexJobParameters = new ReindexJobParameters();
reindexJobParameters.setRequestPartitionId(RequestPartitionId.fromPartitionId(expectedPartitionId));
when(myHapiTransactionService.withRequest(any())).thenCallRealMethod();
when(myHapiTransactionService.buildExecutionBuilder(any())).thenCallRealMethod();
// when
myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", reindexJobParameters);

View File

@ -29,7 +29,6 @@ import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import ca.uhn.fhir.subscription.SubscriptionConstants;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
@ -56,9 +55,6 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi
public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE;
private final String myResourceName;
@Autowired
protected ISearchParamRegistry mySearchParamRegistry;
@Autowired
private IResourceChangeListenerRegistry myResourceChangeListenerRegistry;
@ -71,10 +67,19 @@ public abstract class BaseResourceCacheSynchronizer implements IResourceChangeLi
private final Semaphore mySyncResourcesSemaphore = new Semaphore(1);
private final Object mySyncResourcesLock = new Object();
public BaseResourceCacheSynchronizer(String theResourceName) {
protected BaseResourceCacheSynchronizer(String theResourceName) {
myResourceName = theResourceName;
}
protected BaseResourceCacheSynchronizer(
String theResourceName,
IResourceChangeListenerRegistry theResourceChangeListenerRegistry,
DaoRegistry theDaoRegistry) {
myResourceName = theResourceName;
myDaoRegistry = theDaoRegistry;
myResourceChangeListenerRegistry = theResourceChangeListenerRegistry;
}
@PostConstruct
public void registerListener() {
if (myDaoRegistry.getResourceDaoOrNull(myResourceName) == null) {

View File

@ -374,7 +374,7 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
* The Stream MUST be closed to avoid leaking resources.
* @param theParams the search
* @param theRequest for partition target info
* @return a Stream than MUST only be used within the calling transaction.
* @return a Stream that MUST only be used within the calling transaction.
*/
default <PID extends IResourcePersistentId<?>> Stream<PID> searchForIdStream(
SearchParameterMap theParams,
@ -384,6 +384,11 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
return iResourcePersistentIds.stream();
}
default <PID extends IResourcePersistentId<?>> Stream<PID> searchForIdStream(
SearchParameterMap theParams, RequestDetails theRequest) {
return searchForIdStream(theParams, theRequest, null);
}
/**
* Takes a map of incoming raw search parameters and translates/parses them into
* appropriate {@link IQueryParameterType} instances of the appropriate type

View File

@ -104,12 +104,16 @@ public class HapiTransactionService implements IHapiTransactionService {
@Override
public IExecutionBuilder withRequest(@Nullable RequestDetails theRequestDetails) {
return new ExecutionBuilder(theRequestDetails);
return buildExecutionBuilder(theRequestDetails);
}
@Override
public IExecutionBuilder withSystemRequest() {
return new ExecutionBuilder(null);
return buildExecutionBuilder(null);
}
protected IExecutionBuilder buildExecutionBuilder(@Nullable RequestDetails theRequestDetails) {
return new ExecutionBuilder(theRequestDetails);
}
/**
@ -236,15 +240,7 @@ public class HapiTransactionService implements IHapiTransactionService {
@Nullable
protected <T> T doExecute(ExecutionBuilder theExecutionBuilder, TransactionCallback<T> theCallback) {
final RequestPartitionId requestPartitionId;
if (theExecutionBuilder.myRequestPartitionId != null) {
requestPartitionId = theExecutionBuilder.myRequestPartitionId;
} else if (theExecutionBuilder.myRequestDetails != null) {
requestPartitionId = myRequestPartitionHelperSvc.determineGenericPartitionForRequest(
theExecutionBuilder.myRequestDetails);
} else {
requestPartitionId = null;
}
final RequestPartitionId requestPartitionId = theExecutionBuilder.getEffectiveRequestPartitionId();
RequestPartitionId previousRequestPartitionId = null;
if (requestPartitionId != null) {
previousRequestPartitionId = ourRequestPartitionThreadLocal.get();
@ -443,7 +439,8 @@ public class HapiTransactionService implements IHapiTransactionService {
}
}
protected class ExecutionBuilder implements IExecutionBuilder, TransactionOperations {
// wipmb is Clone ok, or do we want an explicit copy constructor?
protected class ExecutionBuilder implements IExecutionBuilder, TransactionOperations, Cloneable {
private final RequestDetails myRequestDetails;
private Isolation myIsolation;
@ -451,7 +448,7 @@ public class HapiTransactionService implements IHapiTransactionService {
private boolean myReadOnly;
private TransactionDetails myTransactionDetails;
private Runnable myOnRollback;
private RequestPartitionId myRequestPartitionId;
protected RequestPartitionId myRequestPartitionId;
protected ExecutionBuilder(RequestDetails theRequestDetails) {
myRequestDetails = theRequestDetails;
@ -533,6 +530,19 @@ public class HapiTransactionService implements IHapiTransactionService {
public Propagation getPropagation() {
return myPropagation;
}
@Nullable
protected RequestPartitionId getEffectiveRequestPartitionId() {
final RequestPartitionId requestPartitionId;
if (myRequestPartitionId != null) {
requestPartitionId = myRequestPartitionId;
} else if (myRequestDetails != null) {
requestPartitionId = myRequestPartitionHelperSvc.determineGenericPartitionForRequest(myRequestDetails);
} else {
requestPartitionId = null;
}
return requestPartitionId;
}
}
/**

View File

@ -31,6 +31,7 @@ import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionOperations;
import java.util.concurrent.Callable;
import java.util.stream.Stream;
/**
* This class is used to execute code within the context of a database transaction,
@ -107,5 +108,13 @@ public interface IHapiTransactionService {
<T> T execute(Callable<T> theTask);
<T> T execute(@Nonnull TransactionCallback<T> callback);
default <T> T read(Callable<T> theCallback) {
return execute(theCallback);
}
default <T> Stream<T> search(Callable<Stream<T>> theCallback) {
return execute(theCallback);
}
}
}

View File

@ -23,13 +23,13 @@ import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import jakarta.annotation.Nonnull;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.slf4j.LoggerFactory;
import jakarta.annotation.Nonnull;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
@ -37,7 +37,6 @@ import java.util.stream.Collectors;
/**
* Test helper to collect logback lines.
*
* The empty constructor will capture all log events, or you can name a log root to limit the noise.
*/
public class LogbackCaptureTestExtension implements BeforeEachCallback, AfterEachCallback {
@ -83,7 +82,11 @@ public class LogbackCaptureTestExtension implements BeforeEachCallback, AfterEac
this((Logger) LoggerFactory.getLogger(theLoggerName), theLevel);
}
/**
public LogbackCaptureTestExtension(Class<?> theClass) {
this(theClass.getName());
}
/**
* Returns a copy to avoid concurrent modification errors.
* @return A copy of the log events so far.
*/