From 381a1ae20611427eddb6a8743e4043c9917cfbbc Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 21 Apr 2017 10:51:35 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6659 Honor the sender settle mode from the client and ensure we always set receiver mode to FIRST to reflect we don't support SECOND. Adds tests coverage and needed test client changes for this. --- .../amqp/protocol/AmqpAbstractReceiver.java | 7 ++ .../transport/amqp/protocol/AmqpSender.java | 13 ++- .../transport/amqp/client/AmqpReceiver.java | 52 ++++++++++-- .../transport/amqp/client/AmqpSender.java | 46 +++++++++- .../transport/amqp/client/AmqpSession.java | 74 ++++++++++++++++ .../amqp/interop/AmqpReceiverTest.java | 75 +++++++++++++++++ .../amqp/interop/AmqpSenderTest.java | 84 +++++++++++++++++++ 7 files changed, 337 insertions(+), 14 deletions(-) diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java index 9ed465a2c8..d8ed1089b4 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport.amqp.protocol; import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.transport.amqp.AmqpProtocolException; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; import org.fusesource.hawtbuf.Buffer; @@ -47,6 +48,12 @@ public abstract class AmqpAbstractReceiver extends AmqpAbstractLink { public AmqpAbstractReceiver(AmqpSession session, Receiver endpoint) { super(session, endpoint); this.configuredCredit = session.getConnection().getConfiguredReceiverCredit(); + + // We don't support second so enforce it as First and let remote decide what to do + this.endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + // Match what the sender mode is + this.endpoint.setSenderSettleMode(endpoint.getRemoteSenderSettleMode()); } @Override diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 23f8597b1b..5ac95b2d19 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -52,6 +52,7 @@ import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Link; @@ -77,8 +78,8 @@ public class AmqpSender extends AmqpAbstractLink { private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(); private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator(); - private final LinkedList outbound = new LinkedList(); - private final LinkedList dispatchedInTx = new LinkedList(); + private final LinkedList outbound = new LinkedList<>(); + private final LinkedList dispatchedInTx = new LinkedList<>(); private final ConsumerInfo consumerInfo; private AbstractSubscription subscription; @@ -106,8 +107,14 @@ public class AmqpSender extends AmqpAbstractLink { public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) { super(session, endpoint); + // We don't support second so enforce it as First and let remote decide what to do + this.endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + // Match what the sender mode is + this.endpoint.setSenderSettleMode(endpoint.getRemoteSenderSettleMode()); + this.consumerInfo = consumerInfo; - this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; + this.presettle = getEndpoint().getSenderSettleMode() == SenderSettleMode.SETTLED; } @Override diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index fbe36ff3f3..cfc355cfed 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -65,12 +65,15 @@ public class AmqpReceiver extends AmqpAbstractResource { private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class); private final AtomicBoolean closed = new AtomicBoolean(); - private final BlockingQueue prefetch = new LinkedBlockingDeque(); + private final BlockingQueue prefetch = new LinkedBlockingDeque<>(); private final AmqpSession session; private final String address; private final String receiverId; + private final Source userSpecifiedSource; + private final SenderSettleMode userSpecifiedSenderSettlementMode; + private final ReceiverSettleMode userSpecifiedReceiverSettlementMode; private String subscriptionName; private String selector; @@ -84,13 +87,31 @@ public class AmqpReceiver extends AmqpAbstractResource { * Create a new receiver instance. * * @param session - * The parent session that created the receiver. + * The parent session that created the receiver. * @param address * The address that this receiver should listen on. * @param receiverId * The unique ID assigned to this receiver. */ public AmqpReceiver(AmqpSession session, String address, String receiverId) { + this(session, address, receiverId, null, null); + } + + /** + * Create a new receiver instance. + * + * @param session + * The parent session that created the receiver. + * @param address + * The address that this receiver should listen on. + * @param receiverId + * The unique ID assigned to this receiver. + * @param senderMode + * The {@link SenderSettleMode} to use on open. + * @param receiverMode + * The {@link ReceiverSettleMode} to use on open. + */ + public AmqpReceiver(AmqpSession session, String address, String receiverId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) { if (address != null && address.isEmpty()) { throw new IllegalArgumentException("Address cannot be empty."); @@ -100,6 +121,8 @@ public class AmqpReceiver extends AmqpAbstractResource { this.session = session; this.address = address; this.receiverId = receiverId; + this.userSpecifiedSenderSettlementMode = senderMode; + this.userSpecifiedReceiverSettlementMode = receiverMode; } /** @@ -122,6 +145,8 @@ public class AmqpReceiver extends AmqpAbstractResource { this.userSpecifiedSource = source; this.address = source.getAddress(); this.receiverId = receiverId; + this.userSpecifiedSenderSettlementMode = null; + this.userSpecifiedReceiverSettlementMode = null; } /** @@ -715,12 +740,25 @@ public class AmqpReceiver extends AmqpAbstractResource { Receiver receiver = session.getEndpoint().receiver(receiverName); receiver.setSource(source); receiver.setTarget(target); - if (isPresettle()) { - receiver.setSenderSettleMode(SenderSettleMode.SETTLED); + + if (userSpecifiedSenderSettlementMode != null) { + receiver.setSenderSettleMode(userSpecifiedSenderSettlementMode); + if (SenderSettleMode.SETTLED.equals(userSpecifiedSenderSettlementMode)) { + setPresettle(true); + } } else { - receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); + if (isPresettle()) { + receiver.setSenderSettleMode(SenderSettleMode.SETTLED); + } else { + receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); + } + } + + if (userSpecifiedReceiverSettlementMode != null) { + receiver.setReceiverSettleMode(userSpecifiedReceiverSettlementMode); + } else { + receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); } - receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); setEndpoint(receiver); @@ -788,7 +826,7 @@ public class AmqpReceiver extends AmqpAbstractResource { } protected void configureSource(Source source) { - Map filters = new HashMap(); + Map filters = new HashMap<>(); Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL}; diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index 8d28688f18..a48b856d87 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -66,7 +66,10 @@ public class AmqpSender extends AmqpAbstractResource { private final AmqpSession session; private final String address; private final String senderId; + private final Target userSpecifiedTarget; + private final SenderSettleMode userSpecifiedSenderSettlementMode; + private final ReceiverSettleMode userSpecifiedReceiverSettlementMode; private boolean presettle; private long sendTimeout = DEFAULT_SEND_TIMEOUT; @@ -90,6 +93,24 @@ public class AmqpSender extends AmqpAbstractResource { * The unique ID assigned to this sender. */ public AmqpSender(AmqpSession session, String address, String senderId) { + this(session, address, senderId, null, null); + } + + /** + * Create a new sender instance. + * + * @param session + * The parent session that created the session. + * @param address + * The address that this sender produces to. + * @param senderId + * The unique ID assigned to this sender. + * @param senderMode + * The {@link SenderSettleMode} to use on open. + * @param receiverMode + * The {@link ReceiverSettleMode} to use on open. + */ + public AmqpSender(AmqpSession session, String address, String senderId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) { if (address != null && address.isEmpty()) { throw new IllegalArgumentException("Address cannot be empty."); @@ -99,6 +120,8 @@ public class AmqpSender extends AmqpAbstractResource { this.address = address; this.senderId = senderId; this.userSpecifiedTarget = null; + this.userSpecifiedSenderSettlementMode = senderMode; + this.userSpecifiedReceiverSettlementMode = receiverMode; } /** @@ -121,6 +144,8 @@ public class AmqpSender extends AmqpAbstractResource { this.userSpecifiedTarget = target; this.address = target.getAddress(); this.senderId = senderId; + this.userSpecifiedSenderSettlementMode = null; + this.userSpecifiedReceiverSettlementMode = null; } /** @@ -302,12 +327,25 @@ public class AmqpSender extends AmqpAbstractResource { Sender sender = session.getEndpoint().sender(senderName); sender.setSource(source); sender.setTarget(target); - if (presettle) { - sender.setSenderSettleMode(SenderSettleMode.SETTLED); + + if (userSpecifiedSenderSettlementMode != null) { + sender.setSenderSettleMode(userSpecifiedSenderSettlementMode); + if (SenderSettleMode.SETTLED.equals(userSpecifiedSenderSettlementMode)) { + presettle = true; + } } else { - sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); + if (presettle) { + sender.setSenderSettleMode(SenderSettleMode.SETTLED); + } else { + sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); + } + } + + if (userSpecifiedReceiverSettlementMode != null) { + sender.setReceiverSettleMode(userSpecifiedReceiverSettlementMode); + } else { + sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); } - sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); sender.setDesiredCapabilities(desiredCapabilities); sender.setOfferedCapabilities(offeredCapabilities); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 88cba94369..b8d38e2e9f 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -28,6 +28,8 @@ import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Session; @@ -138,6 +140,42 @@ public class AmqpSession extends AmqpAbstractResource { return createSender(address, presettle, null, null, null); } + /** + * Create a sender instance using the given address + * + * @param address + * the address to which the sender will produce its messages. + * @param senderSettlementMode + * controls the settlement mode used by the created Sender + * @param receiverSettlementMode + * controls the desired settlement mode used by the remote Receiver + * + * @return a newly created sender that is ready for use. + * + * @throws Exception if an error occurs while creating the sender. + */ + public AmqpSender createSender(final String address, final SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception { + checkClosed(); + + final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId(), senderMode, receiverMode); + final ClientFuture request = new ClientFuture(); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + sender.setStateInspector(getStateInspector()); + sender.open(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + + return sender; + } + /** * Create a sender instance using the given address * @@ -349,6 +387,42 @@ public class AmqpSession extends AmqpAbstractResource { return receiver; } + /** + * Create a receiver instance using the given address + * + * @param address + * the address to which the receiver will subscribe for its messages. + * @param senderSettlementMode + * controls the desired settlement mode used by the remote Sender + * @param receiverSettlementMode + * controls the settlement mode used by the created Receiver + * + * @return a newly created receiver that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createReceiver(String address, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception { + checkClosed(); + + final ClientFuture request = new ClientFuture(); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId(), senderMode, receiverMode); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + receiver.setStateInspector(getStateInspector()); + receiver.open(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + + return receiver; + } + /** * Create a receiver instance using the given Source * diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java index f60af7bbd9..7bdafb7ff6 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java @@ -47,6 +47,8 @@ import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.message.Message; @@ -124,6 +126,79 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { connection.close(); } + @Test(timeout = 60000) + public void testSenderSettlementModeSettledIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED); + } + + @Test(timeout = 60000) + public void testSenderSettlementModeUnsettledIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.UNSETTLED); + } + + @Test(timeout = 60000) + public void testSenderSettlementModeMixedIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.MIXED); + } + + public void doTestSenderSettlementModeIsHonored(SenderSettleMode settleMode) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerService.getAdminView().getQueues().length); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), settleMode, ReceiverSettleMode.FIRST); + + assertEquals(1, brokerService.getAdminView().getQueues().length); + assertNotNull(getProxyToQueue(getTestName())); + assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length); + + assertEquals(settleMode, receiver.getEndpoint().getRemoteSenderSettleMode()); + + receiver.close(); + assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length); + + connection.close(); + } + + @Test(timeout = 60000) + public void testReceiverSettlementModeSetToFirst() throws Exception { + doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.FIRST); + } + + @Test(timeout = 60000) + public void testReceiverSettlementModeSetToSecond() throws Exception { + doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.SECOND); + } + + /* + * The Broker does not currently support ReceiverSettleMode of SECOND so we ensure that + * it always drops that back to FIRST to let the client know. The client will need to + * check and react accordingly. + */ + private void doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode modeToUse) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerService.getAdminView().getQueues().length); + + AmqpReceiver receiver = session.createReceiver( + "queue://" + getTestName(), SenderSettleMode.MIXED, modeToUse); + + assertEquals(1, brokerService.getAdminView().getQueues().length); + assertNotNull(getProxyToQueue(getTestName())); + assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length); + + assertEquals(ReceiverSettleMode.FIRST, receiver.getEndpoint().getRemoteReceiverSettleMode()); + + receiver.close(); + assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length); + + connection.close(); + } + @Test(timeout = 60000) public void testCreateQueueReceiverWithJMSSelector() throws Exception { AmqpClient client = createAmqpClient(); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java index da9e011246..79ff275327 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java @@ -37,6 +37,8 @@ import org.apache.activemq.transport.amqp.client.AmqpSupport; import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.activemq.util.Wait; import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Sender; import org.junit.Test; @@ -49,6 +51,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testCreateQueueSender() throws Exception { AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.connect()); AmqpSession session = connection.createSession(); @@ -84,6 +87,87 @@ public class AmqpSenderTest extends AmqpClientTestSupport { connection.close(); } + @Test(timeout = 60000) + public void testSenderSettlementModeSettledIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED); + } + + @Test(timeout = 60000) + public void testSenderSettlementModeUnsettledIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.UNSETTLED); + } + + @Test(timeout = 60000) + public void testSenderSettlementModeMixedIsHonored() throws Exception { + doTestSenderSettlementModeIsHonored(SenderSettleMode.MIXED); + } + + public void doTestSenderSettlementModeIsHonored(SenderSettleMode settleMode) throws Exception { + AmqpClient client = createAmqpClient(); + + client.setTraceFrames(true); + + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerService.getAdminView().getQueues().length); + + AmqpSender sender = session.createSender( + "queue://" + getTestName(), settleMode, ReceiverSettleMode.FIRST); + + assertEquals(1, brokerService.getAdminView().getQueues().length); + assertNotNull(getProxyToQueue(getTestName())); + assertEquals(1, brokerService.getAdminView().getQueueProducers().length); + + assertEquals(settleMode, sender.getEndpoint().getRemoteSenderSettleMode()); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + sender.close(); + assertEquals(0, brokerService.getAdminView().getQueueProducers().length); + + connection.close(); + } + + @Test(timeout = 60000) + public void testReceiverSettlementModeSetToFirst() throws Exception { + doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.FIRST); + } + + @Test(timeout = 60000) + public void testReceiverSettlementModeSetToSecond() throws Exception { + doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.SECOND); + } + + /* + * The Broker does not currently support ReceiverSettleMode of SECOND so we ensure that + * it always drops that back to FIRST to let the client know. The client will need to + * check and react accordingly. + */ + private void doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode modeToUse) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerService.getAdminView().getQueues().length); + + AmqpSender sender = session.createSender( + "queue://" + getTestName(), SenderSettleMode.UNSETTLED, modeToUse); + + assertEquals(1, brokerService.getAdminView().getQueues().length); + assertNotNull(getProxyToQueue(getTestName())); + assertEquals(1, brokerService.getAdminView().getQueueProducers().length); + + assertEquals(ReceiverSettleMode.FIRST, sender.getEndpoint().getRemoteReceiverSettleMode()); + + sender.close(); + assertEquals(0, brokerService.getAdminView().getQueueProducers().length); + + connection.close(); + } + @Test(timeout = 60000) public void testSendMessageToQueue() throws Exception { AmqpClient client = createAmqpClient();