This closes #3336
This commit is contained in:
commit
dc8a8d8fa6
|
@ -104,8 +104,9 @@ public class MQTTConnectionManager {
|
|||
}
|
||||
|
||||
session.getConnection().setConnected(true);
|
||||
session.start();
|
||||
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED);
|
||||
// ensure we don't publish before the CONNACK
|
||||
session.start();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -41,6 +41,7 @@ import io.netty.util.ReferenceCountUtil;
|
|||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.logs.AuditLogger;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
||||
import org.apache.activemq.artemis.utils.actors.Actor;
|
||||
|
||||
/**
|
||||
* This class is responsible for receiving and sending MQTT packets, delegating behaviour to one of the
|
||||
|
@ -65,9 +66,12 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
|||
|
||||
private boolean stopped = false;
|
||||
|
||||
private final Actor<MqttMessage> mqttMessageActor;
|
||||
|
||||
public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager protocolManager) {
|
||||
this.server = server;
|
||||
this.protocolManager = protocolManager;
|
||||
this.mqttMessageActor = new Actor<>(server.getExecutorFactory().getExecutor(), this::act);
|
||||
}
|
||||
|
||||
void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception {
|
||||
|
@ -82,23 +86,35 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
|||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
connection.dataReceived();
|
||||
MqttMessage message = (MqttMessage) msg;
|
||||
|
||||
// Disconnect if Netty codec failed to decode the stream.
|
||||
if (message.decoderResult().isFailure()) {
|
||||
log.debug("Bad Message Disconnecting Client.");
|
||||
disconnect(true);
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.ctx == null) {
|
||||
this.ctx = ctx;
|
||||
}
|
||||
|
||||
// let netty handle keepalive response
|
||||
if (MqttMessageType.PINGREQ == message.fixedHeader().messageType()) {
|
||||
handlePingreq();
|
||||
} else {
|
||||
mqttMessageActor.act(message);
|
||||
}
|
||||
}
|
||||
|
||||
public void act(MqttMessage message) {
|
||||
try {
|
||||
if (stopped) {
|
||||
disconnect(true);
|
||||
return;
|
||||
}
|
||||
|
||||
MqttMessage message = (MqttMessage) msg;
|
||||
|
||||
// Disconnect if Netty codec failed to decode the stream.
|
||||
if (message.decoderResult().isFailure()) {
|
||||
log.debug("Bad Message Disconnecting Client.");
|
||||
disconnect(true);
|
||||
return;
|
||||
}
|
||||
|
||||
connection.dataReceived();
|
||||
|
||||
if (AuditLogger.isAnyLoggingEnabled()) {
|
||||
AuditLogger.setRemoteAddress(connection.getRemoteAddress());
|
||||
}
|
||||
|
@ -113,7 +129,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
|||
|
||||
switch (message.fixedHeader().messageType()) {
|
||||
case CONNECT:
|
||||
handleConnect((MqttConnectMessage) message, ctx);
|
||||
handleConnect((MqttConnectMessage) message);
|
||||
break;
|
||||
case PUBLISH:
|
||||
handlePublish((MqttPublishMessage) message);
|
||||
|
@ -136,9 +152,6 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
|||
case UNSUBSCRIBE:
|
||||
handleUnsubscribe((MqttUnsubscribeMessage) message);
|
||||
break;
|
||||
case PINGREQ:
|
||||
handlePingreq();
|
||||
break;
|
||||
case DISCONNECT:
|
||||
disconnect(false);
|
||||
break;
|
||||
|
@ -150,10 +163,10 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
|||
disconnect(true);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.debug("Error processing Control Packet, Disconnecting Client", e);
|
||||
log.warn("Error processing Control Packet, Disconnecting Client", e);
|
||||
disconnect(true);
|
||||
} finally {
|
||||
ReferenceCountUtil.release(msg);
|
||||
ReferenceCountUtil.release(message);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -162,8 +175,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
|||
*
|
||||
* @param connect
|
||||
*/
|
||||
void handleConnect(MqttConnectMessage connect, ChannelHandlerContext ctx) throws Exception {
|
||||
this.ctx = ctx;
|
||||
void handleConnect(MqttConnectMessage connect) throws Exception {
|
||||
connectionEntry.ttl = connect.variableHeader().keepAliveTimeSeconds() * 1500L;
|
||||
|
||||
String clientId = connect.payload().clientIdentifier();
|
||||
|
|
|
@ -244,11 +244,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
|
||||
this.creationTime = System.currentTimeMillis();
|
||||
|
||||
if (browseOnly) {
|
||||
browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator());
|
||||
} else {
|
||||
messageQueue.addConsumer(this);
|
||||
}
|
||||
this.supportLargeMessage = supportLargeMessage;
|
||||
|
||||
if (credits != null) {
|
||||
|
@ -261,6 +256,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
|
||||
this.server = server;
|
||||
|
||||
if (browseOnly) {
|
||||
browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator());
|
||||
} else {
|
||||
messageQueue.addConsumer(this);
|
||||
}
|
||||
|
||||
if (session.getRemotingConnection() instanceof CoreRemotingConnection) {
|
||||
CoreRemotingConnection coreRemotingConnection = (CoreRemotingConnection) session.getRemotingConnection();
|
||||
if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && coreRemotingConnection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
||||
|
|
|
@ -19,18 +19,23 @@ package org.apache.activemq.artemis.tests.integration.mqtt;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnection;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.fusesource.mqtt.client.BlockingConnection;
|
||||
import org.fusesource.mqtt.client.MQTT;
|
||||
import org.fusesource.mqtt.client.QoS;
|
||||
import org.fusesource.mqtt.client.Topic;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class MQTTConnnectionCleanupTest extends MQTTTestSupport {
|
||||
|
||||
|
@ -82,4 +87,36 @@ public class MQTTConnnectionCleanupTest extends MQTTTestSupport {
|
|||
connection.disconnect();
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 30 * 1000)
|
||||
public void testSlowSubscribeWontBlockKeepAlive() throws Exception {
|
||||
MQTT mqtt = createMQTTConnection();
|
||||
mqtt.setClientId("");
|
||||
mqtt.setKeepAlive((short) 1);
|
||||
|
||||
mqtt.setCleanSession(true);
|
||||
BlockingConnection connection = mqtt.blockingConnection();
|
||||
connection.connect();
|
||||
|
||||
NettyAcceptor acceptor = (NettyAcceptor) server.getRemotingService().getAcceptor("MQTT");
|
||||
assertEquals(1, acceptor.getConnections().size());
|
||||
|
||||
server.getConfiguration().getBrokerBindingPlugins().add(new ActiveMQServerBindingPlugin() {
|
||||
@Override
|
||||
public void beforeAddBinding(Binding binding) throws ActiveMQException {
|
||||
// take a little nap
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(3);
|
||||
} catch (Exception ok) {
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// this should take a while...but should succeed.
|
||||
connection.subscribe(new Topic[]{new Topic("T.x", QoS.AT_LEAST_ONCE)});
|
||||
assertEquals(1, acceptor.getConnections().size());
|
||||
|
||||
connection.disconnect();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.mqtt.imported;
|
|||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLException;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -118,7 +119,7 @@ public class MQTTSecurityCRLTest extends ActiveMQTestBase {
|
|||
* keytool -import -trustcacerts -alias trust_key -file ca.crt -keystore truststore.jks
|
||||
*/
|
||||
|
||||
@Test(expected = SSLException.class)
|
||||
@Test
|
||||
public void crlRevokedTest() throws Exception {
|
||||
|
||||
ActiveMQServer server1 = initServer();
|
||||
|
@ -144,7 +145,9 @@ public class MQTTSecurityCRLTest extends ActiveMQTestBase {
|
|||
Message message1 = connection1.receive(5, TimeUnit.SECONDS);
|
||||
|
||||
assertEquals(payload1, new String(message1.getPayload()));
|
||||
|
||||
fail("We expect an exception of some sort!");
|
||||
} catch (SSLException expected) {
|
||||
} catch (EOFException canHappenAlso) {
|
||||
} finally {
|
||||
if (connection1 != null) {
|
||||
connection1.disconnect();
|
||||
|
|
|
@ -17,16 +17,36 @@
|
|||
|
||||
package org.apache.activemq.artemis.tests.smoke.replicationflow;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.CompletionListener;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionConsumer;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.ConnectionMetaData;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.ExceptionListener;
|
||||
import javax.jms.JMSContext;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MapMessage;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.QueueBrowser;
|
||||
import javax.jms.ServerSessionPool;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.StreamMessage;
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TemporaryTopic;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
import javax.jms.TopicSubscriber;
|
||||
import java.io.Serializable;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Enumeration;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -35,6 +55,9 @@ import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
|
|||
import org.apache.activemq.artemis.utils.RetryRule;
|
||||
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
import org.fusesource.mqtt.client.BlockingConnection;
|
||||
import org.fusesource.mqtt.client.MQTT;
|
||||
import org.fusesource.mqtt.client.QoS;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
|
@ -71,7 +94,7 @@ public class SoakPagingTest extends SmokeTestBase {
|
|||
|
||||
@Parameterized.Parameters(name = "protocol={0}, type={1}, tx={2}")
|
||||
public static Collection<Object[]> getParams() {
|
||||
return Arrays.asList(new Object[][]{{"AMQP", "shared", false}, {"AMQP", "queue", false}, {"OPENWIRE", "topic", false}, {"OPENWIRE", "queue", false}, {"CORE", "shared", false}, {"CORE", "queue", false},
|
||||
return Arrays.asList(new Object[][]{{"MQTT", "topic", false}, {"AMQP", "shared", false}, {"AMQP", "queue", false}, {"OPENWIRE", "topic", false}, {"OPENWIRE", "queue", false}, {"CORE", "shared", false}, {"CORE", "queue", false},
|
||||
{"AMQP", "shared", true}, {"AMQP", "queue", true}, {"OPENWIRE", "topic", true}, {"OPENWIRE", "queue", true}, {"CORE", "shared", true}, {"CORE", "queue", true}});
|
||||
}
|
||||
|
||||
|
@ -99,6 +122,8 @@ public class SoakPagingTest extends SmokeTestBase {
|
|||
private static ConnectionFactory createConnectionFactory(String protocol, String uri) {
|
||||
if (protocol.toUpperCase().equals("OPENWIRE")) {
|
||||
return new org.apache.activemq.ActiveMQConnectionFactory("failover:(" + uri + ")");
|
||||
} else if (protocol.toUpperCase().equals("MQTT")) {
|
||||
return new MQTTCF();
|
||||
} else if (protocol.toUpperCase().equals("AMQP")) {
|
||||
|
||||
if (uri.startsWith("tcp://")) {
|
||||
|
@ -314,3 +339,761 @@ public class SoakPagingTest extends SmokeTestBase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
class MQTTCF implements ConnectionFactory, Connection, Session, Topic, MessageConsumer, MessageProducer {
|
||||
|
||||
final MQTT mqtt = new MQTT();
|
||||
private String topicName;
|
||||
private BlockingConnection blockingConnection;
|
||||
private boolean consumer;
|
||||
|
||||
MQTTCF() {
|
||||
try {
|
||||
mqtt.setHost("localhost", 61616);
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection createConnection() throws JMSException {
|
||||
return new MQTTCF();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection createConnection(String userName, String password) throws JMSException {
|
||||
MQTTCF result = new MQTTCF();
|
||||
result.mqtt.setUserName(userName);
|
||||
result.mqtt.setPassword(password);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JMSContext createContext() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JMSContext createContext(int sessionMode) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JMSContext createContext(String userName, String password) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JMSContext createContext(String userName, String password, int sessionMode) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMessageSelector() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message receive() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message receive(long timeout) throws JMSException {
|
||||
final org.fusesource.mqtt.client.Message message;
|
||||
try {
|
||||
message = blockingConnection.receive(timeout, TimeUnit.MILLISECONDS);
|
||||
} catch (Exception e) {
|
||||
throw new JMSException(e.getMessage());
|
||||
}
|
||||
if (message != null) {
|
||||
return new TMessage(new String(message.getPayload()));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message receiveNoWait() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDisableMessageID(boolean value) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getDisableMessageID() throws JMSException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDisableMessageTimestamp(boolean value) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getDisableMessageTimestamp() throws JMSException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDeliveryMode(int deliveryMode) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getDeliveryMode() throws JMSException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPriority(int defaultPriority) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPriority() throws JMSException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimeToLive(long timeToLive) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTimeToLive() throws JMSException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Destination getDestination() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Message message) throws JMSException {
|
||||
try {
|
||||
blockingConnection.publish(topicName, message.getBody(String.class).getBytes(), QoS.EXACTLY_ONCE, false);
|
||||
} catch (Exception e) {
|
||||
throw new JMSException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Destination destination, Message message) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Message message, CompletionListener completionListener) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Destination destination,
|
||||
Message message,
|
||||
int deliveryMode,
|
||||
int priority,
|
||||
long timeToLive) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Destination destination,
|
||||
Message message,
|
||||
CompletionListener completionListener) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Message message,
|
||||
int deliveryMode,
|
||||
int priority,
|
||||
long timeToLive,
|
||||
CompletionListener completionListener) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Destination destination,
|
||||
Message message,
|
||||
int deliveryMode,
|
||||
int priority,
|
||||
long timeToLive,
|
||||
CompletionListener completionListener) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDeliveryDelay() throws JMSException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDeliveryDelay(long deliveryDelay) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public BytesMessage createBytesMessage() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MapMessage createMapMessage() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message createMessage() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectMessage createObjectMessage() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamMessage createStreamMessage() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TextMessage createTextMessage() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TextMessage createTextMessage(String text) throws JMSException {
|
||||
return new TMessage(text);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getTransacted() throws JMSException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getAcknowledgeMode() throws JMSException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commit() throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rollback() throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Session createSession(int sessionMode) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Session createSession() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getClientID() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setClientID(String clientID) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConnectionMetaData getMetaData() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExceptionListener getExceptionListener() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setExceptionListener(ExceptionListener listener) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws JMSException {
|
||||
blockingConnection = mqtt.blockingConnection();
|
||||
try {
|
||||
blockingConnection.connect();
|
||||
|
||||
if (consumer) {
|
||||
blockingConnection.subscribe(new org.fusesource.mqtt.client.Topic[]{new org.fusesource.mqtt.client.Topic(topicName, QoS.EXACTLY_ONCE)});
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new JMSException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConnectionConsumer createConnectionConsumer(Destination destination,
|
||||
String messageSelector,
|
||||
ServerSessionPool sessionPool,
|
||||
int maxMessages) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
|
||||
String subscriptionName,
|
||||
String messageSelector,
|
||||
ServerSessionPool sessionPool,
|
||||
int maxMessages) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic,
|
||||
String subscriptionName,
|
||||
String messageSelector,
|
||||
ServerSessionPool sessionPool,
|
||||
int maxMessages) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConnectionConsumer createSharedConnectionConsumer(Topic topic,
|
||||
String subscriptionName,
|
||||
String messageSelector,
|
||||
ServerSessionPool sessionPool,
|
||||
int maxMessages) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recover() throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageListener getMessageListener() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMessageListener(MessageListener listener) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageProducer createProducer(Destination destination) throws JMSException {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageConsumer createConsumer(Destination destination) throws JMSException {
|
||||
consumer = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageConsumer createConsumer(Destination destination,
|
||||
String messageSelector,
|
||||
boolean NoLocal) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Queue createQueue(String queueName) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Topic createTopic(String topicName) throws JMSException {
|
||||
this.topicName = topicName;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TopicSubscriber createDurableSubscriber(Topic topic,
|
||||
String name,
|
||||
String messageSelector,
|
||||
boolean noLocal) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueueBrowser createBrowser(Queue queue) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TemporaryQueue createTemporaryQueue() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TemporaryTopic createTemporaryTopic() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unsubscribe(String name) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageConsumer createSharedConsumer(Topic topic,
|
||||
String sharedSubscriptionName,
|
||||
String messageSelector) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageConsumer createDurableConsumer(Topic topic,
|
||||
String name,
|
||||
String messageSelector,
|
||||
boolean noLocal) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageConsumer createSharedDurableConsumer(Topic topic,
|
||||
String name,
|
||||
String messageSelector) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTopicName() throws JMSException {
|
||||
return topicName;
|
||||
}
|
||||
|
||||
private class TMessage implements Message, TextMessage {
|
||||
|
||||
final String s;
|
||||
|
||||
TMessage(String s) {
|
||||
this.s = s;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getJMSMessageID() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setJMSMessageID(String id) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getJMSTimestamp() throws JMSException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setJMSTimestamp(long timestamp) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setJMSCorrelationID(String correlationID) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getJMSCorrelationID() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Destination getJMSReplyTo() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setJMSReplyTo(Destination replyTo) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Destination getJMSDestination() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setJMSDestination(Destination destination) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getJMSDeliveryMode() throws JMSException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setJMSDeliveryMode(int deliveryMode) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getJMSRedelivered() throws JMSException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setJMSRedelivered(boolean redelivered) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getJMSType() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setJMSType(String type) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getJMSExpiration() throws JMSException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setJMSExpiration(long expiration) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getJMSPriority() throws JMSException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setJMSPriority(int priority) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearProperties() throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean propertyExists(String name) throws JMSException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getBooleanProperty(String name) throws JMSException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getByteProperty(String name) throws JMSException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public short getShortProperty(String name) throws JMSException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getIntProperty(String name) throws JMSException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLongProperty(String name) throws JMSException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getFloatProperty(String name) throws JMSException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getDoubleProperty(String name) throws JMSException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getStringProperty(String name) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getObjectProperty(String name) throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Enumeration getPropertyNames() throws JMSException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBooleanProperty(String name, boolean value) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setByteProperty(String name, byte value) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setShortProperty(String name, short value) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setIntProperty(String name, int value) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLongProperty(String name, long value) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFloatProperty(String name, float value) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDoubleProperty(String name, double value) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setStringProperty(String name, String value) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setObjectProperty(String name, Object value) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void acknowledge() throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearBody() throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getJMSDeliveryTime() throws JMSException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setJMSDeliveryTime(long deliveryTime) throws JMSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T getBody(Class<T> c) throws JMSException {
|
||||
return (T) s;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isBodyAssignableTo(Class c) throws JMSException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setText(String string) throws JMSException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getText() throws JMSException {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue