Feature Request: Add client support for $process-message special operation #407

Changed client to send messages to a processing server:
Object response = client
.operation()
.onServer()
.processMessage()
.setResponseUrlParam("http://myserver/fhir")
.setMessageBundle(msgBundle)
.execute();
This commit is contained in:
Hugo Soares 2017-01-18 19:33:56 +00:00
parent 8f7a374a25
commit 59d29c45b3
9 changed files with 395 additions and 30 deletions

View File

@ -90,6 +90,8 @@ public abstract class BaseClient implements IRestfulClient {
private Boolean myPrettyPrint = false;
private SummaryEnum mySummary;
private final String myUrlBase;
private Boolean myAsync = null;
private String myResponseUrl = null;
BaseClient(IHttpClient theClient, String theUrlBase, RestfulClientFactory theFactory) {
super();
@ -236,6 +238,17 @@ public abstract class BaseClient implements IRestfulClient {
params.put(Constants.PARAM_ELEMENTS, Collections.singletonList(StringUtils.join(theSubsetElements, ',')));
}
/*
Message Client Params
*/
if(myAsync != null && myAsync == true ) {
params.put(Constants.PARAM_ASYNC, Collections.singletonList(myAsync.toString()));
}
if(myResponseUrl != null && !myResponseUrl.isEmpty() ) {
params.put(Constants.PARAM_RESPONSE_URL, Collections.singletonList(myResponseUrl));
}
EncodingEnum encoding = getEncoding();
if (theEncoding != null) {
encoding = theEncoding;
@ -471,6 +484,7 @@ public abstract class BaseClient implements IRestfulClient {
mySummary = theSummary;
}
@Override
public void unregisterInterceptor(IClientInterceptor theInterceptor) {
Validate.notNull(theInterceptor, "Interceptor can not be null");
@ -485,6 +499,16 @@ public abstract class BaseClient implements IRestfulClient {
return preferResponseTypes;
}
@Override
public void setMessageAsync(Boolean isAsync) {
myAsync = isAsync;
}
@Override
public void setMessageResponseUrl(String responseUrl) {
myResponseUrl = responseUrl;
}
protected final class ResourceResponseHandler<T extends IBaseResource> implements IClientResponseHandler<T> {
private boolean myAllowHtmlResponse;

View File

@ -66,6 +66,7 @@ import ca.uhn.fhir.model.primitive.DateTimeDt;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.model.primitive.InstantDt;
import ca.uhn.fhir.model.primitive.UriDt;
import ca.uhn.fhir.model.valueset.BundleTypeEnum;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.MethodOutcome;
@ -102,6 +103,7 @@ import ca.uhn.fhir.rest.gclient.IMetaAddOrDeleteSourced;
import ca.uhn.fhir.rest.gclient.IMetaAddOrDeleteUnsourced;
import ca.uhn.fhir.rest.gclient.IMetaGetUnsourced;
import ca.uhn.fhir.rest.gclient.IOperation;
import ca.uhn.fhir.rest.gclient.IOperationProcessMsg;
import ca.uhn.fhir.rest.gclient.IOperationUnnamed;
import ca.uhn.fhir.rest.gclient.IOperationUntyped;
import ca.uhn.fhir.rest.gclient.IOperationUntypedWithInput;
@ -1443,7 +1445,7 @@ public class GenericClient extends BaseClient implements IGenericClient {
}
@SuppressWarnings("rawtypes")
private class OperationInternal extends BaseClientExecutable implements IOperation, IOperationUnnamed, IOperationUntyped, IOperationUntypedWithInput, IOperationUntypedWithInputAndPartialOutput {
private class OperationInternal extends BaseClientExecutable implements IOperation, IOperationUnnamed, IOperationUntyped, IOperationUntypedWithInput, IOperationUntypedWithInputAndPartialOutput, IOperationProcessMsg {
private IIdType myId;
private String myOperationName;
@ -1451,6 +1453,8 @@ public class GenericClient extends BaseClient implements IGenericClient {
private RuntimeResourceDefinition myParametersDef;
private Class<? extends IBaseResource> myType;
private boolean myUseHttpGet;
private IBaseBundle myMsgBundle;
@SuppressWarnings("unchecked")
private void addParam(String theName, IBase theValue) {
@ -1505,41 +1509,58 @@ public class GenericClient extends BaseClient implements IGenericClient {
@SuppressWarnings("unchecked")
@Override
public Object execute() {
String resourceName;
String id;
if (myType != null) {
resourceName = myContext.getResourceDefinition(myType).getName();
id = null;
} else if (myId != null) {
resourceName = myId.getResourceType();
id = myId.getIdPart();
} else {
resourceName = null;
id = null;
}
if (myOperationName != null && myOperationName.equals(Constants.EXTOP_PROCESS_MESSAGE)) {
//If is $process-message operation
BaseHttpClientInvocation invocation = OperationMethodBinding.createProcessMsgInvocation(myContext, myOperationName, myMsgBundle);
BaseHttpClientInvocation invocation = OperationMethodBinding.createOperationInvocation(myContext, resourceName, id, myOperationName, myParameters, myUseHttpGet);
ResourceResponseHandler handler = new ResourceResponseHandler();
handler.setPreferResponseTypes(getPreferResponseTypes(myType));
ResourceResponseHandler handler = new ResourceResponseHandler();
handler.setPreferResponseTypes(getPreferResponseTypes(myType));
/*
Map<String, List<String>> urlParams = new LinkedHashMap<String, List<String>>();
GenericClient.this.addParam(urlParams, Constants.PARAM_ASYNC, String.valueOf(myAsync));
GenericClient.this.addParam(urlParams, Constants.PARAM_RESPONSE_URL, String.valueOf(myRespondToUri));
*/
Object retVal = invoke(null, handler, invocation);
if (myContext.getResourceDefinition((IBaseResource) retVal).getName().equals("Parameters")) {
return retVal;
} else {
RuntimeResourceDefinition def = myContext.getResourceDefinition("Parameters");
IBaseResource parameters = def.newInstance();
Object retVal = invoke(null, handler, invocation);
return retVal;
} else {
String resourceName;
String id;
if (myType != null) {
resourceName = myContext.getResourceDefinition(myType).getName();
id = null;
} else if (myId != null) {
resourceName = myId.getResourceType();
id = myId.getIdPart();
} else {
resourceName = null;
id = null;
}
BaseRuntimeChildDefinition paramChild = def.getChildByName("parameter");
BaseRuntimeElementCompositeDefinition<?> paramChildElem = (BaseRuntimeElementCompositeDefinition<?>) paramChild.getChildByName("parameter");
IBase parameter = paramChildElem.newInstance();
paramChild.getMutator().addValue(parameters, parameter);
BaseHttpClientInvocation invocation = OperationMethodBinding.createOperationInvocation(myContext, resourceName, id, myOperationName, myParameters, myUseHttpGet);
BaseRuntimeChildDefinition resourceElem = paramChildElem.getChildByName("resource");
resourceElem.getMutator().addValue(parameter, (IBase) retVal);
ResourceResponseHandler handler = new ResourceResponseHandler();
handler.setPreferResponseTypes(getPreferResponseTypes(myType));
return parameters;
}
Object retVal = invoke(null, handler, invocation);
if (myContext.getResourceDefinition((IBaseResource) retVal).getName().equals("Parameters")) {
return retVal;
} else {
RuntimeResourceDefinition def = myContext.getResourceDefinition("Parameters");
IBaseResource parameters = def.newInstance();
BaseRuntimeChildDefinition paramChild = def.getChildByName("parameter");
BaseRuntimeElementCompositeDefinition<?> paramChildElem = (BaseRuntimeElementCompositeDefinition<?>) paramChild.getChildByName("parameter");
IBase parameter = paramChildElem.newInstance();
paramChild.getMutator().addValue(parameters, parameter);
BaseRuntimeChildDefinition resourceElem = paramChildElem.getChildByName("resource");
resourceElem.getMutator().addValue(parameter, (IBase) retVal);
return parameters;
}
}
}
@Override
@ -1549,12 +1570,19 @@ public class GenericClient extends BaseClient implements IGenericClient {
return this;
}
@Override
public IOperationProcessMsg processMessage() {
myOperationName = Constants.EXTOP_PROCESS_MESSAGE;
return this;
}
@Override
public IOperationUnnamed onInstance(IIdType theId) {
myId = theId;
return this;
}
@Override
public IOperationUnnamed onServer() {
return this;
@ -1625,6 +1653,36 @@ public class GenericClient extends BaseClient implements IGenericClient {
return this;
}
@Override
public IOperationProcessMsg setMessageBundle(IBaseBundle theMsgBundle) {
Validate.notNull(theMsgBundle, "theMsgBundle must not be null");
/* Validate.isTrue(theMsgBundle.getType().getValueAsEnum() == BundleTypeEnum.MESSAGE);
Validate.isTrue(theMsgBundle.getEntries().size() > 0);
Validate.notNull(theMsgBundle.getEntries().get(0).getResource(), "Message Bundle first entry must be a MessageHeader resource");
Validate.isTrue(theMsgBundle.getEntries().get(0).getResource().getResourceName().equals("MessageHeader"), "Message Bundle first entry must be a MessageHeader resource");
*/
myMsgBundle = (IBaseBundle) theMsgBundle;
return this;
}
@Override
public IOperationProcessMsg<IBaseOperationOutcome> setAsyncProcessingMode() {
setMessageAsync(true);
return this;
}
@Override
public IOperationProcessMsg setResponseUrlParam(String responseUrl) {
Validate.notEmpty(responseUrl, "responseUrl must not be null");
Validate.matchesPattern(responseUrl, "^(https?)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-a-zA-Z0-9+&@#/%=~_|]", "responseUrl must be a valid URL");
setMessageResponseUrl(responseUrl);
return this;
}
}
private final class OperationOutcomeResponseHandler implements IClientResponseHandler<BaseOperationOutcome> {

View File

@ -89,6 +89,22 @@ public interface IRestfulClient {
*/
void setSummary(SummaryEnum theSummary);
/**
* Specifies that the server should process the message Synchronously or Asynchronously. This is only applicable to the
* $process-message server operation
*
* @param isAsync The async print flag to use in the request (default is <code>false</code>)
*/
void setMessageAsync(Boolean isAsync);
/**
* Specifies that the server should send Asynchronous responses to this url. This is only applicable to the
* $process-message server operation
*
* @param isAsync The async print flag to use in the request (default is <code>false</code>)
*/
void setMessageResponseUrl(String responseUrl);
/**
* Remove an intercaptor that was previously registered using {@link IRestfulClient#registerInterceptor(IClientInterceptor)}
*/

View File

@ -0,0 +1,24 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package ca.uhn.fhir.rest.gclient;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IBaseOperationOutcome;
import org.hl7.fhir.instance.model.api.IBaseResource;
/**
*
* @author HGS
*/
public interface IOperationProcessMsg<T extends IBaseResource> extends IClientExecutable<IOperationProcessMsg<T>, T> {
IOperationProcessMsg<T> setMessageBundle(IBaseBundle theMsgBundle);
IOperationProcessMsg<T> setAsyncProcessingMode();
IOperationProcessMsg<T> setResponseUrlParam(String respondToUri);
}

View File

@ -24,5 +24,6 @@ package ca.uhn.fhir.rest.gclient;
public interface IOperationUnnamed {
IOperationUntyped named(String theName);
IOperationProcessMsg processMessage();
}

View File

@ -59,6 +59,7 @@ import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.MethodNotAllowedException;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor.ActionRequestDetails;
import ca.uhn.fhir.util.FhirTerser;
import org.hl7.fhir.instance.model.api.IBaseBundle;
public class OperationMethodBinding extends BaseResourceReturningMethodBinding {
@ -318,6 +319,21 @@ public class OperationMethodBinding extends BaseResourceReturningMethodBinding {
myDescription = theDescription;
}
public static BaseHttpClientInvocation createProcessMsgInvocation(FhirContext theContext, String theOperationName, IBaseBundle theInput) {
StringBuilder b = new StringBuilder();
if (b.length() > 0) {
b.append('/');
}
if (!theOperationName.startsWith("$")) {
b.append("$");
}
b.append(theOperationName);
return new HttpPostClientInvocation(theContext, theInput, b.toString());
}
public static BaseHttpClientInvocation createOperationInvocation(FhirContext theContext, String theResourceName, String theId, String theOperationName, IBaseParameters theInput, boolean theUseHttpGet) {
StringBuilder b = new StringBuilder();
if (theResourceName != null) {

View File

@ -139,6 +139,9 @@ public class Constants {
public static final String PARAM_TAGS = "_tags";
public static final String PARAM_TEXT = "_text";
public static final String PARAM_VALIDATE = "_validate";
public static final String PARAM_ASYNC = "async"; //Used in messaging
public static final String PARAM_RESPONSE_URL = "response-url"; //Used in messaging
public static final String EXTOP_PROCESS_MESSAGE = "$process-message"; //Used in messaging
public static final String PARAMQUALIFIER_MISSING = ":missing";
public static final String PARAMQUALIFIER_MISSING_FALSE = "false";
public static final String PARAMQUALIFIER_MISSING_TRUE = "true";

View File

@ -2703,6 +2703,16 @@ public class GenericClientDstu2Test {
public void unregisterInterceptor(IClientInterceptor theInterceptor) {
//nothing
}
@Override
public void setMessageAsync(Boolean isAsync) {
}
@Override
public void setMessageResponseUrl(String responseUrl) {
}
}
}

View File

@ -0,0 +1,213 @@
package ca.uhn.fhir.rest.client;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.InputStream;
import java.io.StringReader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.ReaderInputStream;
import org.apache.http.HttpResponse;
import org.apache.http.ProtocolVersion;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicStatusLine;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.internal.stubbing.defaultanswers.ReturnsDeepStubs;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.model.dstu2.resource.Bundle;
import ca.uhn.fhir.model.dstu2.resource.MessageHeader;
import ca.uhn.fhir.model.dstu2.resource.OperationOutcome;
import ca.uhn.fhir.model.dstu2.resource.Parameters;
import ca.uhn.fhir.model.dstu2.resource.Patient;
import ca.uhn.fhir.model.dstu2.valueset.BundleTypeEnum;
import ca.uhn.fhir.model.dstu2.valueset.ResponseTypeEnum;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.model.primitive.StringDt;
import ca.uhn.fhir.rest.client.apache.ApacheRestfulClientFactory;
import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor;
import ca.uhn.fhir.rest.server.Constants;
import ca.uhn.fhir.rest.server.EncodingEnum;
import ca.uhn.fhir.util.TestUtil;
import java.util.Date;
import java.util.UUID;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MessageClientDstu2Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(MessageClientDstu2Test.class);
private FhirContext ourCtx;
private HttpClient myHttpClient;
private HttpResponse myHttpResponse;
@AfterClass
public static void afterClassClearContext() {
TestUtil.clearAllStaticFieldsForUnitTest();
}
@Before
public void before() {
ourCtx = FhirContext.forDstu2();
myHttpClient = mock(HttpClient.class, new ReturnsDeepStubs());
ourCtx.setRestfulClientFactory(new ApacheRestfulClientFactory(ourCtx));
ourCtx.getRestfulClientFactory().setConnectionRequestTimeout(10000);
ourCtx.getRestfulClientFactory().setConnectTimeout(10000);
ourCtx.getRestfulClientFactory().setPoolMaxPerRoute(100);
ourCtx.getRestfulClientFactory().setPoolMaxTotal(100);
ourCtx.getRestfulClientFactory().setHttpClient(myHttpClient);
ourCtx.getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.NEVER);
myHttpResponse = mock(HttpResponse.class, new ReturnsDeepStubs());
System.setProperty(BaseClient.HAPI_CLIENT_KEEPRESPONSES, "true");
}
@Test
public void testSendMessageAsync() throws Exception {
OperationOutcome oo = new OperationOutcome();
oo.addIssue().setDiagnostics("FOOBAR");
final String msg = ourCtx.newJsonParser().encodeResourceToString(oo);
ArgumentCaptor<HttpUriRequest> capt = ArgumentCaptor.forClass(HttpUriRequest.class);
when(myHttpClient.execute(capt.capture())).thenReturn(myHttpResponse);
when(myHttpResponse.getStatusLine()).thenReturn(new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 200, "OK"));
when(myHttpResponse.getEntity().getContentType()).thenReturn(new BasicHeader("content-type", Constants.CT_FHIR_JSON + "; charset=UTF-8"));
when(myHttpResponse.getEntity().getContent()).thenAnswer(new Answer<InputStream>() {
@Override
public InputStream answer(InvocationOnMock theInvocation) throws Throwable {
return new ReaderInputStream(new StringReader(msg), Charset.forName("UTF-8"));
}
});
IGenericClient client = ourCtx.getRestfulClientFactory().newGenericClient("http://192.168.4.196:83/fhirServer");
client.setEncoding(EncodingEnum.JSON);
// Create the input message to pass to the server
final Bundle msgBundle = getMessageBundle(
"myEvent", "Test Event",
"MySource", "http://myServer/fhir/", "MyDestination", "http://myDestinationServer/fhir/");
// Invoke $process-message
Object response = client
.operation()
.onServer()
.processMessage()
.setAsyncProcessingMode()
.setResponseUrlParam("http://myserver/fhir")
.setMessageBundle(msgBundle)
.execute();
//System.out.println(response);
assertEquals("http://192.168.4.196:83/fhirServer/$process-message?_format=json&async=true&response-url=http%3A%2F%2Fmyserver%2Ffhir", capt.getAllValues().get(0).getURI().toASCIIString());
assertEquals("POST", capt.getAllValues().get(0).getRequestLine().getMethod());
//assertEquals("<Parameters xmlns=\"http://hl7.org/fhir\"><parameter><name value=\"resource\"/><resource><Patient xmlns=\"http://hl7.org/fhir\"><name><given value=\"GIVEN\"/></name></Patient></resource></parameter></Parameters>", extractBody(capt, 0));
//assertNotNull(response.getOperationOutcome());
assertEquals("FOOBAR", ((OperationOutcome) response).getIssueFirstRep().getDiagnosticsElement().getValue());
}
@Test
public void testSendMessage() throws Exception {
final Bundle msgBundleResponse = getMessageBundle(
"myEvent", "Test Event",
"MySource", "http://myServer/fhir/", "MyDestination", "http://myDestinationServer/fhir/");
((MessageHeader) msgBundleResponse.getEntryFirstRep().getResource()).getResponse().setCode(ResponseTypeEnum.OK);
final String msg = ourCtx.newJsonParser().encodeResourceToString(msgBundleResponse);
ArgumentCaptor<HttpUriRequest> capt = ArgumentCaptor.forClass(HttpUriRequest.class);
when(myHttpClient.execute(capt.capture())).thenReturn(myHttpResponse);
when(myHttpResponse.getStatusLine()).thenReturn(new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 200, "OK"));
when(myHttpResponse.getEntity().getContentType()).thenReturn(new BasicHeader("content-type", Constants.CT_FHIR_JSON + "; charset=UTF-8"));
when(myHttpResponse.getEntity().getContent()).thenAnswer(new Answer<InputStream>() {
@Override
public InputStream answer(InvocationOnMock theInvocation) throws Throwable {
return new ReaderInputStream(new StringReader(msg), Charset.forName("UTF-8"));
}
});
IGenericClient client = ourCtx.getRestfulClientFactory().newGenericClient("http://192.168.4.196:83/fhirServer");
client.setEncoding(EncodingEnum.JSON);
// Create the input message to pass to the server
final Bundle msgBundle = getMessageBundle(
"myEvent", "Test Event",
"MySource", "http://myServer/fhir/", "MyDestination", "http://myDestinationServer/fhir/");
// Invoke $process-message
Object response = client
.operation()
.onServer()
.processMessage()
.setResponseUrlParam("http://myserver/fhir")
.setMessageBundle(msgBundle)
.execute();
//System.out.println(response);
assertEquals("http://192.168.4.196:83/fhirServer/$process-message?_format=json&response-url=http%3A%2F%2Fmyserver%2Ffhir", capt.getAllValues().get(0).getURI().toASCIIString());
assertEquals("POST", capt.getAllValues().get(0).getRequestLine().getMethod());
//assertEquals("<Parameters xmlns=\"http://hl7.org/fhir\"><parameter><name value=\"resource\"/><resource><Patient xmlns=\"http://hl7.org/fhir\"><name><given value=\"GIVEN\"/></name></Patient></resource></parameter></Parameters>", extractBody(capt, 0));
//assertNotNull(response.getOperationOutcome());
assertEquals("MessageHeader", ((Bundle) response).getEntryFirstRep().getResource().getResourceName());
}
/**
* Criar um FHIR Message Bundle pre-preenchido com os parametros
*
* @param eventCode
* @param eventDisplay
* @param sourceName
* @param sourceEnpoint
* @param destinationName
* @param destinationEndpoint
* @return Message Bundle
*/
public static Bundle getMessageBundle(String eventCode, String eventDisplay, String sourceName, String sourceEnpoint, String destinationName, String destinationEndpoint) {
/*
Init Bundle
*/
Bundle msgBundle = new Bundle();
msgBundle.getMeta().setLastUpdated(new Date());
msgBundle.setType(BundleTypeEnum.MESSAGE); //Document Type
msgBundle.setId(UUID.randomUUID().toString()); // Random ID
/*
Init MessageHeader
*/
MessageHeader msh = new MessageHeader();
msh.setId(UUID.randomUUID().toString());
msh.setTimestampWithMillisPrecision(new Date());
msh.getEvent().setSystem("http://mybServer/fhir/events");
msh.getEvent().setCode(eventCode);
msh.getEvent().setDisplay(eventDisplay);
msh.getSource().setName(sourceName);
msh.getSource().setEndpoint(sourceEnpoint);
msh.getDestinationFirstRep().setName(destinationName);
msh.getDestinationFirstRep().setEndpoint(destinationEndpoint);
Bundle.Entry entry = new Bundle.Entry();
entry.setFullUrl("http://mybase/fhirServer/Bundle/" + msh.getId().getValue());
entry.setResource(msh);
msgBundle.addEntry(entry);
return msgBundle;
}
}