Merge branch 'master' into jm-generate-ddl-from-parent

This commit is contained in:
juan.marchionatto 2024-08-11 17:47:49 -04:00
commit 4dcf933f05
7 changed files with 136 additions and 11 deletions

View File

@ -2515,6 +2515,22 @@ public enum Pointcut implements IPointcut {
MDM_SUBMIT(
void.class, "ca.uhn.fhir.rest.api.server.RequestDetails", "ca.uhn.fhir.mdm.model.mdmevents.MdmSubmitEvent"),
/**
* <b>MDM_SUBMIT_PRE_MESSAGE_DELIVERY Hook:</b>
* Invoked immediately before the delivery of a MESSAGE to the broker.
* <p>
* Hooks can make changes to the delivery payload.
* Furthermore, modification can be made to the outgoing message,
* for example adding headers or changing message key,
* which will be used for the subsequent processing.
* </p>
* Hooks should accept the following parameters:
* <ul>
* <li>ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage</li>
* </ul>
*/
MDM_SUBMIT_PRE_MESSAGE_DELIVERY(void.class, "ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage"),
/**
* <b>JPA Hook:</b>
* This hook is invoked when a cross-partition reference is about to be

View File

@ -0,0 +1,4 @@
---
type: add
issue: 6182
title: "A new Pointcut called `MDM_SUBMIT_PRE_MESSAGE_DELIVERY` has been added. If you wish to customize the `ResourceModifiedJsonMessage` sent to the broker, you can do so by implementing this Pointcut, and returning `ResourceModifiedJsonMessage`."

View File

@ -1,8 +1,10 @@
package ca.uhn.fhir.jpa.mdm.provider;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.mdm.log.Logs;
import ca.uhn.fhir.mdm.rules.config.MdmSettings;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
@ -30,6 +32,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
@ -245,16 +248,24 @@ public class MdmProviderBatchR4Test extends BaseLinkR4Test {
Patient janePatient = createPatientAndUpdateLinks(buildJanePatient());
Patient janePatient2 = createPatientAndUpdateLinks(buildJanePatient());
assertLinkCount(5);
final AtomicBoolean mdmSubmitBeforeMessageDeliveryHookCalled = new AtomicBoolean();
final Object interceptor = new Object() {
@Hook(Pointcut.MDM_SUBMIT_PRE_MESSAGE_DELIVERY)
void hookMethod(ResourceModifiedJsonMessage theResourceModifiedJsonMessage) {
mdmSubmitBeforeMessageDeliveryHookCalled.set(true);
}
};
myInterceptorService.registerInterceptor(interceptor);
// When
clearMdmLinks();
afterMdmLatch.runWithExpectedCount(3, () -> {
myMdmProvider.mdmBatchPatientType(null , null, theSyncOrAsyncRequest);
});
// Then
assertThat(mdmSubmitBeforeMessageDeliveryHookCalled).isTrue();
updatePatientAndUpdateLinks(janePatient);
updatePatientAndUpdateLinks(janePatient2);
assertLinkCount(3);
myInterceptorService.unregisterInterceptor(interceptor);
}
}

View File

@ -20,6 +20,9 @@
package ca.uhn.fhir.mdm.svc;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
@ -44,9 +47,12 @@ public class MdmChannelSubmitterSvcImpl implements IMdmChannelSubmitterSvc {
private MessageChannel myMdmChannelProducer;
private FhirContext myFhirContext;
private final FhirContext myFhirContext;
private IChannelFactory myChannelFactory;
private final IChannelFactory myChannelFactory;
@Autowired
private IInterceptorBroadcaster myInterceptorBroadcaster;
@Override
public void submitResourceToMdmChannel(IBaseResource theResource) {
@ -59,6 +65,11 @@ public class MdmChannelSubmitterSvcImpl implements IMdmChannelSubmitterSvc {
(RequestPartitionId) theResource.getUserData(Constants.RESOURCE_PARTITION_ID));
resourceModifiedMessage.setOperationType(ResourceModifiedMessage.OperationTypeEnum.MANUALLY_TRIGGERED);
resourceModifiedJsonMessage.setPayload(resourceModifiedMessage);
if (myInterceptorBroadcaster.hasHooks(Pointcut.MDM_SUBMIT_PRE_MESSAGE_DELIVERY)) {
final HookParams params =
new HookParams().add(ResourceModifiedJsonMessage.class, resourceModifiedJsonMessage);
myInterceptorBroadcaster.callHooks(Pointcut.MDM_SUBMIT_PRE_MESSAGE_DELIVERY, params);
}
boolean success = getMdmChannelProducer().send(resourceModifiedJsonMessage);
if (!success) {
ourLog.error("Failed to submit {} to MDM Channel.", resourceModifiedMessage.getPayloadId());

View File

@ -20,6 +20,7 @@
package ca.uhn.fhir.rest.api.server;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.RequestTypeEnum;
@ -30,6 +31,7 @@ import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.UrlUtil;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
@ -39,6 +41,7 @@ import java.io.InputStream;
import java.io.Reader;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -50,6 +53,8 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
public abstract class RequestDetails {
public static final byte[] BAD_STREAM_PLACEHOLDER =
(Msg.code(2543) + "PLACEHOLDER WHEN READING FROM BAD STREAM").getBytes(StandardCharsets.UTF_8);
private final StopWatch myRequestStopwatch;
private IInterceptorBroadcaster myInterceptorBroadcaster;
private String myTenantId;
@ -523,9 +528,22 @@ public abstract class RequestDetails {
mySubRequest = theSubRequest;
}
public final byte[] loadRequestContents() {
public final synchronized byte[] loadRequestContents() {
if (myRequestContents == null) {
myRequestContents = getByteStreamRequestContents();
// Initialize the byte array to a non-null value to avoid repeated calls to getByteStreamRequestContents()
// which can occur when getByteStreamRequestContents() throws an Exception
myRequestContents = ArrayUtils.EMPTY_BYTE_ARRAY;
try {
myRequestContents = getByteStreamRequestContents();
} finally {
if (myRequestContents == null) {
// if reading the stream throws an exception, then our contents are still null, but the stream is
// dead.
// Set a placeholder value so nobody tries to read again.
myRequestContents = BAD_STREAM_PLACEHOLDER;
}
}
assert myRequestContents != null : "We must not re-read the stream.";
}
return getRequestContentsIfLoaded();
}

View File

@ -1038,6 +1038,9 @@ public class RestfulServer extends HttpServlet implements IRestfulServer<Servlet
theRequest.setAttribute(SERVLET_CONTEXT_ATTRIBUTE, getServletContext());
// keep track of any unhandled exceptions in case the exception handler throws another exception
Throwable unhandledException = null;
try {
/* ***********************************
@ -1205,6 +1208,8 @@ public class RestfulServer extends HttpServlet implements IRestfulServer<Servlet
} catch (NotModifiedException | AuthenticationException e) {
unhandledException = e;
HookParams handleExceptionParams = new HookParams();
handleExceptionParams.add(RequestDetails.class, requestDetails);
handleExceptionParams.add(ServletRequestDetails.class, requestDetails);
@ -1216,9 +1221,12 @@ public class RestfulServer extends HttpServlet implements IRestfulServer<Servlet
}
writeExceptionToResponse(theResponse, e);
unhandledException = null;
} catch (Throwable e) {
unhandledException = e;
/*
* We have caught an exception during request processing. This might be because a handling method threw
* something they wanted to throw (e.g. UnprocessableEntityException because the request
@ -1285,9 +1293,18 @@ public class RestfulServer extends HttpServlet implements IRestfulServer<Servlet
* If nobody handles it, default behaviour is to stream back the OperationOutcome to the client.
*/
DEFAULT_EXCEPTION_HANDLER.handleException(requestDetails, exception, theRequest, theResponse);
unhandledException = null;
} finally {
if (unhandledException != null) {
ourLog.error(
Msg.code(2544) + "Exception handling threw an exception. Initial exception was: {}",
unhandledException.getMessage(),
unhandledException);
unhandledException = null;
}
HookParams params = new HookParams();
params.add(RequestDetails.class, requestDetails);
params.addIfMatchesType(ServletRequestDetails.class, requestDetails);

View File

@ -1,16 +1,22 @@
package ca.uhn.fhir.rest.server;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.ResourceParam;
import ca.uhn.fhir.rest.annotation.Search;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.api.RequestTypeEnum;
import ca.uhn.fhir.test.utilities.HttpClientExtension;
import ca.uhn.fhir.test.utilities.server.RestfulServerExtension;
import ca.uhn.test.util.LogbackTestExtension;
import ca.uhn.test.util.LogbackTestExtensionAssert;
import com.helger.commons.collection.iterate.EmptyEnumeration;
import com.helger.commons.io.stream.StringInputStream;
import jakarta.annotation.Nonnull;
import jakarta.servlet.ReadListener;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletInputStream;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
@ -36,10 +42,12 @@ import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import static org.apache.commons.collections.CollectionUtils.isEmpty;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
@ -52,6 +60,7 @@ public class ServerConcurrencyTest {
@RegisterExtension
private static final RestfulServerExtension ourServer = new RestfulServerExtension(ourCtx)
.registerProvider(new MyPatientProvider());
public static final String SEARCH_TIMEOUT_ERROR = "SEARCH_TIMEOUT_ERROR: Search timed out";
@RegisterExtension
private final HttpClientExtension myHttpClient = new HttpClientExtension();
@ -62,10 +71,12 @@ public class ServerConcurrencyTest {
@Mock
private PrintWriter myWriter;
private HashMap<String, String> myHeaders;
@RegisterExtension
LogbackTestExtension myLogbackTestExtension = new LogbackTestExtension();
@Test
public void testExceptionClosingInputStream() throws IOException {
initRequestMocks();
initRequestMocks("/Patient");
DelegatingServletInputStream inputStream = createMockPatientBodyServletInputStream();
inputStream.setExceptionOnClose(true);
when(myRequest.getInputStream()).thenReturn(inputStream);
@ -78,7 +89,7 @@ public class ServerConcurrencyTest {
@Test
public void testExceptionClosingOutputStream() throws IOException {
initRequestMocks();
initRequestMocks("/Patient");
when(myRequest.getInputStream()).thenReturn(createMockPatientBodyServletInputStream());
when(myResponse.getWriter()).thenReturn(myWriter);
@ -90,12 +101,44 @@ public class ServerConcurrencyTest {
);
}
private void initRequestMocks() {
/**
* Exception thrown during SERVER_OUTGOING_FAILURE_OPERATIONOUTCOME
*/
@Test
void testExceptionThrownDuringExceptionHandler_bothExceptionsLogged() throws ServletException, IOException {
// given
initRequestMocks("/Patient?active=true");
ourServer.getInterceptorService().registerAnonymousInterceptor(Pointcut.SERVER_OUTGOING_FAILURE_OPERATIONOUTCOME, (pointcut,params)->{
throw new RuntimeException("MARKER_2: Exception during exception processing");
});
// when
try {
ourServer.getRestfulServer().handleRequest(RequestTypeEnum.GET, myRequest, myResponse);
} catch (Throwable e) {
// eat any ex that escapes. We need to not depend on default logging.
}
// then
// both exceptions should be logged.
LogbackTestExtensionAssert.assertThat(myLogbackTestExtension).hasErrorMessage("HAPI-2544");
LogbackTestExtensionAssert.assertThat(myLogbackTestExtension).hasErrorMessage(SEARCH_TIMEOUT_ERROR);
}
private void initRequestMocks(String theURL) {
myHeaders = new HashMap<>();
myHeaders.put(Constants.HEADER_CONTENT_TYPE, Constants.CT_FHIR_JSON_NEW);
when(myRequest.getRequestURI()).thenReturn("/Patient");
when(myRequest.getRequestURL()).thenReturn(new StringBuffer(ourServer.getBaseUrl() + "/Patient"));
String relativeUri;
int endOfUri = theURL.indexOf("?");
if (endOfUri >= 0) {
relativeUri = theURL.substring(0,endOfUri);
} else {
relativeUri = theURL;
}
when(myRequest.getRequestURI()).thenReturn(relativeUri);
when(myRequest.getRequestURL()).thenReturn(new StringBuffer(ourServer.getBaseUrl() + theURL));
when(myRequest.getHeader(any())).thenAnswer(t -> {
String header = t.getArgument(0, String.class);
String value = myHeaders.get(header);
@ -190,6 +233,11 @@ public class ServerConcurrencyTest {
.setOperationOutcome(oo);
}
@Search
public List<Patient> search() throws InterruptedException {
throw new InterruptedException(SEARCH_TIMEOUT_ERROR);
}
@Override
public Class<Patient> getResourceType() {