From 9675ecae428d85126429418de544d3a547229f19 Mon Sep 17 00:00:00 2001 From: gtully Date: Thu, 12 Nov 2020 17:17:20 +0000 Subject: [PATCH] ARTEMIS-2985 - don't block netty threads for mqtt protocol actions --- .../protocol/mqtt/MQTTConnectionManager.java | 3 +- .../protocol/mqtt/MQTTProtocolHandler.java | 50 +- .../core/server/impl/ServerConsumerImpl.java | 11 +- .../mqtt/MQTTConnnectionCleanupTest.java | 37 + .../mqtt/imported/MQTTSecurityCRLTest.java | 7 +- .../smoke/replicationflow/SoakPagingTest.java | 785 +++++++++++++++++- 6 files changed, 865 insertions(+), 28 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index 00629d998b..f36c962a71 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -104,8 +104,9 @@ public class MQTTConnectionManager { } session.getConnection().setConnected(true); - session.start(); session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED); + // ensure we don't publish before the CONNACK + session.start(); } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index 136ad7ed37..545d34f97e 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -41,6 +41,7 @@ import io.netty.util.ReferenceCountUtil; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.logs.AuditLogger; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; +import org.apache.activemq.artemis.utils.actors.Actor; /** * This class is responsible for receiving and sending MQTT packets, delegating behaviour to one of the @@ -65,9 +66,12 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { private boolean stopped = false; + private final Actor mqttMessageActor; + public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager protocolManager) { this.server = server; this.protocolManager = protocolManager; + this.mqttMessageActor = new Actor<>(server.getExecutorFactory().getExecutor(), this::act); } void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception { @@ -82,23 +86,35 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { + connection.dataReceived(); + MqttMessage message = (MqttMessage) msg; + + // Disconnect if Netty codec failed to decode the stream. + if (message.decoderResult().isFailure()) { + log.debug("Bad Message Disconnecting Client."); + disconnect(true); + return; + } + + if (this.ctx == null) { + this.ctx = ctx; + } + + // let netty handle keepalive response + if (MqttMessageType.PINGREQ == message.fixedHeader().messageType()) { + handlePingreq(); + } else { + mqttMessageActor.act(message); + } + } + + public void act(MqttMessage message) { try { if (stopped) { disconnect(true); return; } - MqttMessage message = (MqttMessage) msg; - - // Disconnect if Netty codec failed to decode the stream. - if (message.decoderResult().isFailure()) { - log.debug("Bad Message Disconnecting Client."); - disconnect(true); - return; - } - - connection.dataReceived(); - if (AuditLogger.isAnyLoggingEnabled()) { AuditLogger.setRemoteAddress(connection.getRemoteAddress()); } @@ -113,7 +129,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { switch (message.fixedHeader().messageType()) { case CONNECT: - handleConnect((MqttConnectMessage) message, ctx); + handleConnect((MqttConnectMessage) message); break; case PUBLISH: handlePublish((MqttPublishMessage) message); @@ -136,9 +152,6 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { case UNSUBSCRIBE: handleUnsubscribe((MqttUnsubscribeMessage) message); break; - case PINGREQ: - handlePingreq(); - break; case DISCONNECT: disconnect(false); break; @@ -150,10 +163,10 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { disconnect(true); } } catch (Exception e) { - log.debug("Error processing Control Packet, Disconnecting Client", e); + log.warn("Error processing Control Packet, Disconnecting Client", e); disconnect(true); } finally { - ReferenceCountUtil.release(msg); + ReferenceCountUtil.release(message); } } @@ -162,8 +175,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { * * @param connect */ - void handleConnect(MqttConnectMessage connect, ChannelHandlerContext ctx) throws Exception { - this.ctx = ctx; + void handleConnect(MqttConnectMessage connect) throws Exception { connectionEntry.ttl = connect.variableHeader().keepAliveTimeSeconds() * 1500L; String clientId = connect.payload().clientIdentifier(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 5bdf82eef0..e5de2c7f4c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -244,11 +244,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { this.creationTime = System.currentTimeMillis(); - if (browseOnly) { - browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator()); - } else { - messageQueue.addConsumer(this); - } this.supportLargeMessage = supportLargeMessage; if (credits != null) { @@ -261,6 +256,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { this.server = server; + if (browseOnly) { + browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator()); + } else { + messageQueue.addConsumer(this); + } + if (session.getRemotingConnection() instanceof CoreRemotingConnection) { CoreRemotingConnection coreRemotingConnection = (CoreRemotingConnection) session.getRemotingConnection(); if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && coreRemotingConnection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTConnnectionCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTConnnectionCleanupTest.java index c5486eba3e..23540f5067 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTConnnectionCleanupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTConnnectionCleanupTest.java @@ -19,18 +19,23 @@ package org.apache.activemq.artemis.tests.integration.mqtt; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport; import org.apache.activemq.artemis.utils.Wait; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; import org.junit.Test; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; public class MQTTConnnectionCleanupTest extends MQTTTestSupport { @@ -82,4 +87,36 @@ public class MQTTConnnectionCleanupTest extends MQTTTestSupport { connection.disconnect(); } + + @Test(timeout = 30 * 1000) + public void testSlowSubscribeWontBlockKeepAlive() throws Exception { + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(""); + mqtt.setKeepAlive((short) 1); + + mqtt.setCleanSession(true); + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + NettyAcceptor acceptor = (NettyAcceptor) server.getRemotingService().getAcceptor("MQTT"); + assertEquals(1, acceptor.getConnections().size()); + + server.getConfiguration().getBrokerBindingPlugins().add(new ActiveMQServerBindingPlugin() { + @Override + public void beforeAddBinding(Binding binding) throws ActiveMQException { + // take a little nap + try { + TimeUnit.SECONDS.sleep(3); + } catch (Exception ok) { + } + } + }); + + // this should take a while...but should succeed. + connection.subscribe(new Topic[]{new Topic("T.x", QoS.AT_LEAST_ONCE)}); + assertEquals(1, acceptor.getConnections().size()); + + connection.disconnect(); + } + } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTSecurityCRLTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTSecurityCRLTest.java index dd45f5c0c1..3db9734cf2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTSecurityCRLTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTSecurityCRLTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.mqtt.imported; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLException; +import java.io.EOFException; import java.io.IOException; import java.util.concurrent.TimeUnit; @@ -118,7 +119,7 @@ public class MQTTSecurityCRLTest extends ActiveMQTestBase { * keytool -import -trustcacerts -alias trust_key -file ca.crt -keystore truststore.jks */ - @Test(expected = SSLException.class) + @Test public void crlRevokedTest() throws Exception { ActiveMQServer server1 = initServer(); @@ -144,7 +145,9 @@ public class MQTTSecurityCRLTest extends ActiveMQTestBase { Message message1 = connection1.receive(5, TimeUnit.SECONDS); assertEquals(payload1, new String(message1.getPayload())); - + fail("We expect an exception of some sort!"); + } catch (SSLException expected) { + } catch (EOFException canHappenAlso) { } finally { if (connection1 != null) { connection1.disconnect(); diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java index 7eb9a647ce..30b21cac9c 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java @@ -17,16 +17,36 @@ package org.apache.activemq.artemis.tests.smoke.replicationflow; +import javax.jms.BytesMessage; +import javax.jms.CompletionListener; import javax.jms.Connection; +import javax.jms.ConnectionConsumer; import javax.jms.ConnectionFactory; +import javax.jms.ConnectionMetaData; import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSContext; +import javax.jms.JMSException; +import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.ServerSessionPool; import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; import javax.jms.Topic; +import javax.jms.TopicSubscriber; +import java.io.Serializable; import java.util.Arrays; import java.util.Collection; +import java.util.Enumeration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -35,6 +55,9 @@ import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; import org.apache.activemq.artemis.utils.RetryRule; import org.apache.activemq.artemis.utils.SpawnedVMSupport; import org.apache.qpid.jms.JmsConnectionFactory; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.QoS; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -71,7 +94,7 @@ public class SoakPagingTest extends SmokeTestBase { @Parameterized.Parameters(name = "protocol={0}, type={1}, tx={2}") public static Collection getParams() { - return Arrays.asList(new Object[][]{{"AMQP", "shared", false}, {"AMQP", "queue", false}, {"OPENWIRE", "topic", false}, {"OPENWIRE", "queue", false}, {"CORE", "shared", false}, {"CORE", "queue", false}, + return Arrays.asList(new Object[][]{{"MQTT", "topic", false}, {"AMQP", "shared", false}, {"AMQP", "queue", false}, {"OPENWIRE", "topic", false}, {"OPENWIRE", "queue", false}, {"CORE", "shared", false}, {"CORE", "queue", false}, {"AMQP", "shared", true}, {"AMQP", "queue", true}, {"OPENWIRE", "topic", true}, {"OPENWIRE", "queue", true}, {"CORE", "shared", true}, {"CORE", "queue", true}}); } @@ -99,6 +122,8 @@ public class SoakPagingTest extends SmokeTestBase { private static ConnectionFactory createConnectionFactory(String protocol, String uri) { if (protocol.toUpperCase().equals("OPENWIRE")) { return new org.apache.activemq.ActiveMQConnectionFactory("failover:(" + uri + ")"); + } else if (protocol.toUpperCase().equals("MQTT")) { + return new MQTTCF(); } else if (protocol.toUpperCase().equals("AMQP")) { if (uri.startsWith("tcp://")) { @@ -314,3 +339,761 @@ public class SoakPagingTest extends SmokeTestBase { } } } + +class MQTTCF implements ConnectionFactory, Connection, Session, Topic, MessageConsumer, MessageProducer { + + final MQTT mqtt = new MQTT(); + private String topicName; + private BlockingConnection blockingConnection; + private boolean consumer; + + MQTTCF() { + try { + mqtt.setHost("localhost", 61616); + } catch (Exception ignored) { + } + } + + @Override + public Connection createConnection() throws JMSException { + return new MQTTCF(); + } + + @Override + public Connection createConnection(String userName, String password) throws JMSException { + MQTTCF result = new MQTTCF(); + result.mqtt.setUserName(userName); + result.mqtt.setPassword(password); + return result; + } + + @Override + public JMSContext createContext() { + return null; + } + + @Override + public JMSContext createContext(int sessionMode) { + return null; + } + + @Override + public JMSContext createContext(String userName, String password) { + return null; + } + + @Override + public JMSContext createContext(String userName, String password, int sessionMode) { + return null; + } + + @Override + public String getMessageSelector() throws JMSException { + return null; + } + + @Override + public Message receive() throws JMSException { + return null; + } + + @Override + public Message receive(long timeout) throws JMSException { + final org.fusesource.mqtt.client.Message message; + try { + message = blockingConnection.receive(timeout, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new JMSException(e.getMessage()); + } + if (message != null) { + return new TMessage(new String(message.getPayload())); + } + return null; + } + + @Override + public Message receiveNoWait() throws JMSException { + return null; + } + + @Override + public void setDisableMessageID(boolean value) throws JMSException { + + } + + @Override + public boolean getDisableMessageID() throws JMSException { + return false; + } + + @Override + public void setDisableMessageTimestamp(boolean value) throws JMSException { + + } + + @Override + public boolean getDisableMessageTimestamp() throws JMSException { + return false; + } + + @Override + public void setDeliveryMode(int deliveryMode) throws JMSException { + + } + + @Override + public int getDeliveryMode() throws JMSException { + return 0; + } + + @Override + public void setPriority(int defaultPriority) throws JMSException { + + } + + @Override + public int getPriority() throws JMSException { + return 0; + } + + @Override + public void setTimeToLive(long timeToLive) throws JMSException { + + } + + @Override + public long getTimeToLive() throws JMSException { + return 0; + } + + @Override + public Destination getDestination() throws JMSException { + return null; + } + + @Override + public void send(Message message) throws JMSException { + try { + blockingConnection.publish(topicName, message.getBody(String.class).getBytes(), QoS.EXACTLY_ONCE, false); + } catch (Exception e) { + throw new JMSException(e.getMessage()); + } + } + + @Override + public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + + } + + @Override + public void send(Destination destination, Message message) throws JMSException { + + } + + @Override + public void send(Message message, CompletionListener completionListener) throws JMSException { + + } + + @Override + public void send(Destination destination, + Message message, + int deliveryMode, + int priority, + long timeToLive) throws JMSException { + + } + + @Override + public void send(Destination destination, + Message message, + CompletionListener completionListener) throws JMSException { + + } + + @Override + public void send(Message message, + int deliveryMode, + int priority, + long timeToLive, + CompletionListener completionListener) throws JMSException { + + } + + @Override + public void send(Destination destination, + Message message, + int deliveryMode, + int priority, + long timeToLive, + CompletionListener completionListener) throws JMSException { + + } + + @Override + public long getDeliveryDelay() throws JMSException { + return 0; + } + + @Override + public void setDeliveryDelay(long deliveryDelay) throws JMSException { + + } + + @Override + public BytesMessage createBytesMessage() throws JMSException { + return null; + } + + @Override + public MapMessage createMapMessage() throws JMSException { + return null; + } + + @Override + public Message createMessage() throws JMSException { + return null; + } + + @Override + public ObjectMessage createObjectMessage() throws JMSException { + return null; + } + + @Override + public ObjectMessage createObjectMessage(Serializable object) throws JMSException { + return null; + } + + @Override + public StreamMessage createStreamMessage() throws JMSException { + return null; + } + + @Override + public TextMessage createTextMessage() throws JMSException { + return null; + } + + @Override + public TextMessage createTextMessage(String text) throws JMSException { + return new TMessage(text); + } + + @Override + public boolean getTransacted() throws JMSException { + return false; + } + + @Override + public int getAcknowledgeMode() throws JMSException { + return 0; + } + + @Override + public void commit() throws JMSException { + + } + + @Override + public void rollback() throws JMSException { + + } + + @Override + public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { + return this; + } + + @Override + public Session createSession(int sessionMode) throws JMSException { + return null; + } + + @Override + public Session createSession() throws JMSException { + return null; + } + + @Override + public String getClientID() throws JMSException { + return null; + } + + @Override + public void setClientID(String clientID) throws JMSException { + + } + + @Override + public ConnectionMetaData getMetaData() throws JMSException { + return null; + } + + @Override + public ExceptionListener getExceptionListener() throws JMSException { + return null; + } + + @Override + public void setExceptionListener(ExceptionListener listener) throws JMSException { + + } + + @Override + public void start() throws JMSException { + blockingConnection = mqtt.blockingConnection(); + try { + blockingConnection.connect(); + + if (consumer) { + blockingConnection.subscribe(new org.fusesource.mqtt.client.Topic[]{new org.fusesource.mqtt.client.Topic(topicName, QoS.EXACTLY_ONCE)}); + } + + } catch (Exception e) { + throw new JMSException(e.getMessage()); + } + } + + @Override + public void stop() throws JMSException { + + } + + @Override + public void close() throws JMSException { + + } + + @Override + public ConnectionConsumer createConnectionConsumer(Destination destination, + String messageSelector, + ServerSessionPool sessionPool, + int maxMessages) throws JMSException { + return null; + } + + @Override + public ConnectionConsumer createDurableConnectionConsumer(Topic topic, + String subscriptionName, + String messageSelector, + ServerSessionPool sessionPool, + int maxMessages) throws JMSException { + return null; + } + + @Override + public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, + String subscriptionName, + String messageSelector, + ServerSessionPool sessionPool, + int maxMessages) throws JMSException { + return null; + } + + @Override + public ConnectionConsumer createSharedConnectionConsumer(Topic topic, + String subscriptionName, + String messageSelector, + ServerSessionPool sessionPool, + int maxMessages) throws JMSException { + return null; + } + + @Override + public void recover() throws JMSException { + + } + + @Override + public MessageListener getMessageListener() throws JMSException { + return null; + } + + @Override + public void setMessageListener(MessageListener listener) throws JMSException { + + } + + @Override + public void run() { + + } + + @Override + public MessageProducer createProducer(Destination destination) throws JMSException { + return this; + } + + @Override + public MessageConsumer createConsumer(Destination destination) throws JMSException { + consumer = true; + return this; + } + + @Override + public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { + return null; + } + + @Override + public MessageConsumer createConsumer(Destination destination, + String messageSelector, + boolean NoLocal) throws JMSException { + return null; + } + + @Override + public Queue createQueue(String queueName) throws JMSException { + return null; + } + + @Override + public Topic createTopic(String topicName) throws JMSException { + this.topicName = topicName; + return this; + } + + @Override + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { + return null; + } + + @Override + public TopicSubscriber createDurableSubscriber(Topic topic, + String name, + String messageSelector, + boolean noLocal) throws JMSException { + return null; + } + + @Override + public QueueBrowser createBrowser(Queue queue) throws JMSException { + return null; + } + + @Override + public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { + return null; + } + + @Override + public TemporaryQueue createTemporaryQueue() throws JMSException { + return null; + } + + @Override + public TemporaryTopic createTemporaryTopic() throws JMSException { + return null; + } + + @Override + public void unsubscribe(String name) throws JMSException { + + } + + @Override + public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) throws JMSException { + return null; + } + + @Override + public MessageConsumer createSharedConsumer(Topic topic, + String sharedSubscriptionName, + String messageSelector) throws JMSException { + return null; + } + + @Override + public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException { + return null; + } + + @Override + public MessageConsumer createDurableConsumer(Topic topic, + String name, + String messageSelector, + boolean noLocal) throws JMSException { + return null; + } + + @Override + public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException { + return null; + } + + @Override + public MessageConsumer createSharedDurableConsumer(Topic topic, + String name, + String messageSelector) throws JMSException { + return null; + } + + @Override + public String getTopicName() throws JMSException { + return topicName; + } + + private class TMessage implements Message, TextMessage { + + final String s; + + TMessage(String s) { + this.s = s; + } + + @Override + public String getJMSMessageID() throws JMSException { + return null; + } + + @Override + public void setJMSMessageID(String id) throws JMSException { + + } + + @Override + public long getJMSTimestamp() throws JMSException { + return 0; + } + + @Override + public void setJMSTimestamp(long timestamp) throws JMSException { + + } + + @Override + public byte[] getJMSCorrelationIDAsBytes() throws JMSException { + return new byte[0]; + } + + @Override + public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException { + + } + + @Override + public void setJMSCorrelationID(String correlationID) throws JMSException { + + } + + @Override + public String getJMSCorrelationID() throws JMSException { + return null; + } + + @Override + public Destination getJMSReplyTo() throws JMSException { + return null; + } + + @Override + public void setJMSReplyTo(Destination replyTo) throws JMSException { + + } + + @Override + public Destination getJMSDestination() throws JMSException { + return null; + } + + @Override + public void setJMSDestination(Destination destination) throws JMSException { + + } + + @Override + public int getJMSDeliveryMode() throws JMSException { + return 0; + } + + @Override + public void setJMSDeliveryMode(int deliveryMode) throws JMSException { + + } + + @Override + public boolean getJMSRedelivered() throws JMSException { + return false; + } + + @Override + public void setJMSRedelivered(boolean redelivered) throws JMSException { + + } + + @Override + public String getJMSType() throws JMSException { + return null; + } + + @Override + public void setJMSType(String type) throws JMSException { + + } + + @Override + public long getJMSExpiration() throws JMSException { + return 0; + } + + @Override + public void setJMSExpiration(long expiration) throws JMSException { + + } + + @Override + public int getJMSPriority() throws JMSException { + return 0; + } + + @Override + public void setJMSPriority(int priority) throws JMSException { + + } + + @Override + public void clearProperties() throws JMSException { + + } + + @Override + public boolean propertyExists(String name) throws JMSException { + return false; + } + + @Override + public boolean getBooleanProperty(String name) throws JMSException { + return false; + } + + @Override + public byte getByteProperty(String name) throws JMSException { + return 0; + } + + @Override + public short getShortProperty(String name) throws JMSException { + return 0; + } + + @Override + public int getIntProperty(String name) throws JMSException { + return 0; + } + + @Override + public long getLongProperty(String name) throws JMSException { + return 0; + } + + @Override + public float getFloatProperty(String name) throws JMSException { + return 0; + } + + @Override + public double getDoubleProperty(String name) throws JMSException { + return 0; + } + + @Override + public String getStringProperty(String name) throws JMSException { + return null; + } + + @Override + public Object getObjectProperty(String name) throws JMSException { + return null; + } + + @Override + public Enumeration getPropertyNames() throws JMSException { + return null; + } + + @Override + public void setBooleanProperty(String name, boolean value) throws JMSException { + + } + + @Override + public void setByteProperty(String name, byte value) throws JMSException { + + } + + @Override + public void setShortProperty(String name, short value) throws JMSException { + + } + + @Override + public void setIntProperty(String name, int value) throws JMSException { + + } + + @Override + public void setLongProperty(String name, long value) throws JMSException { + + } + + @Override + public void setFloatProperty(String name, float value) throws JMSException { + + } + + @Override + public void setDoubleProperty(String name, double value) throws JMSException { + + } + + @Override + public void setStringProperty(String name, String value) throws JMSException { + + } + + @Override + public void setObjectProperty(String name, Object value) throws JMSException { + + } + + @Override + public void acknowledge() throws JMSException { + + } + + @Override + public void clearBody() throws JMSException { + + } + + @Override + public long getJMSDeliveryTime() throws JMSException { + return 0; + } + + @Override + public void setJMSDeliveryTime(long deliveryTime) throws JMSException { + + } + + @Override + public T getBody(Class c) throws JMSException { + return (T) s; + } + + @Override + public boolean isBodyAssignableTo(Class c) throws JMSException { + return false; + } + + @Override + public void setText(String string) throws JMSException { + } + + @Override + public String getText() throws JMSException { + return s; + } + } +}