diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java index 6e6a40506c..707b312892 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.protocol.proton.plug; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -23,19 +25,35 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.protocol.proton.ActiveMQProtonRemotingConnection; import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager; import org.apache.activemq.artemis.core.protocol.proton.sasl.ActiveMQPlainSASL; +import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.utils.ReusableLatch; +import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.AmqpError; +import org.jboss.logging.Logger; import org.proton.plug.AMQPConnectionCallback; import org.proton.plug.AMQPConnectionContext; import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.SASLResult; import org.proton.plug.ServerSASL; +import org.proton.plug.handler.ExtCapability; import org.proton.plug.sasl.AnonymousServerSASL; +import org.proton.plug.sasl.PlainSASLResult; + +import static org.proton.plug.AmqpSupport.CONTAINER_ID; +import static org.proton.plug.AmqpSupport.INVALID_FIELD; +import static org.proton.plug.context.AbstractConnectionContext.CONNECTION_OPEN_FAILED; public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback { + private static final Logger log = Logger.getLogger(ActiveMQProtonConnectionCallback.class); private final ProtonProtocolManager manager; @@ -49,6 +67,8 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback private final Executor closeExecutor; + private ServerSession internalSession; + public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager, Connection connection, Executor closeExecutor) { @@ -85,8 +105,43 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback return supportsAnonymous; } + @Override + public void init() throws Exception { + //This internal core session is used to represent the connection + //in core server. It is used to identify unique clientIDs. + //Note the Qpid-JMS client does create a initial session + //for each connection. However is comes in as a Begin + //After Open. This makes it unusable for this purpose + //as we need to decide the uniqueness in response to + //Open, and the checking Uniqueness and adding the unique + //client-id to server need to be atomic. + if (internalSession == null) { + SASLResult saslResult = amqpConnection.getSASLResult(); + String user = null; + String passcode = null; + if (saslResult != null) { + user = saslResult.getUser(); + if (saslResult instanceof PlainSASLResult) { + passcode = ((PlainSASLResult) saslResult).getPassword(); + } + } + internalSession = manager.getServer().createSession(createInternalSessionName(), user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonConnectionDelegate, // RemotingConnection remotingConnection, + false, + false, + false, + false, + null, (SessionCallback) createSessionCallback(this.amqpConnection), true); + } + } + @Override public void close() { + try { + internalSession.close(false); + } + catch (Exception e) { + log.error("error closing internal session", e); + } connection.close(); amqpConnection.close(); } @@ -151,4 +206,28 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback public void sendSASLSupported() { connection.write(ActiveMQBuffers.wrappedBuffer(new byte[]{'A', 'M', 'Q', 'P', 3, 1, 0, 0})); } + + @Override + public boolean validateConnection(org.apache.qpid.proton.engine.Connection connection, SASLResult saslResult) { + String remote = connection.getRemoteContainer(); + + if (ExtCapability.needUniqueConnection(connection)) { + if (!internalSession.addUniqueMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, remote)) { + //https://issues.apache.org/jira/browse/ARTEMIS-728 + Map connProp = new HashMap<>(); + connProp.put(CONNECTION_OPEN_FAILED, "true"); + connection.setProperties(connProp); + connection.getCondition().setCondition(AmqpError.INVALID_FIELD); + Map info = new HashMap<>(); + info.put(INVALID_FIELD, CONTAINER_ID); + connection.getCondition().setInfo(info); + return false; + } + } + return true; + } + + private String createInternalSessionName() { + return "amqp:" + UUIDGenerator.getInstance().generateStringUUID(); + } } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java index 199d68db38..df14b0f929 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java @@ -17,9 +17,12 @@ package org.proton.plug; import io.netty.buffer.ByteBuf; +import org.apache.qpid.proton.engine.Connection; public interface AMQPConnectionCallback { + void init() throws Exception; + void close(); /** @@ -41,4 +44,6 @@ public interface AMQPConnectionCallback { boolean isSupportsAnonymous(); void sendSASLSupported(); + + boolean validateConnection(Connection connection, SASLResult saslResult); } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java index 158085563a..4ddbbccf34 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java @@ -60,6 +60,7 @@ public class AmqpSupport { // Lifetime policy symbols public static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); + public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container"); /** * Search for a given Symbol in a given array of Symbol object. * diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java index 9ece7903db..120a37ba40 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java @@ -188,6 +188,13 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl return null; } + protected boolean validateConnection(Connection connection) { + return true; + } + + protected void initInternal() throws Exception { + } + // This listener will perform a bunch of things here class LocalListener extends DefaultEventHandler { @@ -213,13 +220,25 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl @Override public void onRemoteOpen(Connection connection) throws Exception { synchronized (getLock()) { - connection.setContext(AbstractConnectionContext.this); - connection.setContainer(containerId); - connection.setProperties(connectionProperties); - connection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); - connection.open(); + try { + initInternal(); + } + catch (Exception e) { + log.error("Error init connection", e); + } + if (!validateConnection(connection)) { + connection.close(); + } + else { + connection.setContext(AbstractConnectionContext.this); + connection.setContainer(containerId); + connection.setProperties(connectionProperties); + connection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); + connection.open(); + } } initialise(); + /* * This can be null which is in effect an empty map, also we really dont need to check this for in bound connections * but its here in case we add support for outbound connections. diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java index c065527656..266e8b2183 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java @@ -39,7 +39,7 @@ public class ProtonInitializable { public void initialise() throws Exception { if (!initialized) { - initialized = false; + initialized = true; try { if (afterInit != null) { afterInit.run(); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java index 83048d17aa..efaaed4ec0 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java @@ -16,10 +16,9 @@ */ package org.proton.plug.context.server; -import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY; - import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transaction.Coordinator; +import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sender; @@ -30,6 +29,7 @@ import org.proton.plug.AMQPSessionCallback; import org.proton.plug.context.AbstractConnectionContext; import org.proton.plug.context.AbstractProtonSessionContext; import org.proton.plug.exceptions.ActiveMQAMQPException; +import org.proton.plug.handler.ExtCapability; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -58,6 +58,16 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext imp return protonSession; } + @Override + protected boolean validateConnection(Connection connection) { + return connectionCallback.validateConnection(connection, handler.getSASLResult()); + } + + @Override + protected void initInternal() throws Exception { + connectionCallback.init(); + } + @Override protected void remoteLinkOpened(Link link) throws Exception { @@ -84,6 +94,6 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext imp @Override public Symbol[] getConnectionCapabilitiesOffered() { - return new Symbol[]{DELAYED_DELIVERY}; + return ExtCapability.getCapabilities(); } } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ExtCapability.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ExtCapability.java new file mode 100644 index 0000000000..cbb96fd1d2 --- /dev/null +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ExtCapability.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.proton.plug.handler; + +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.engine.Connection; + +import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY; +import static org.proton.plug.AmqpSupport.SOLE_CONNECTION_CAPABILITY; + +public class ExtCapability { + + public static final Symbol[] capabilities = new Symbol[] { + SOLE_CONNECTION_CAPABILITY, DELAYED_DELIVERY + }; + + public static Symbol[] getCapabilities() { + return capabilities; + } + + public static boolean needUniqueConnection(Connection connection) { + Symbol[] extCapabilities = connection.getRemoteDesiredCapabilities(); + if (extCapabilities != null) { + for (Symbol sym : extCapabilities) { + if (sym.compareTo(SOLE_CONNECTION_CAPABILITY) == 0) { + return true; + } + } + } + return false; + } +} diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java index 7208d16dba..b2f640664a 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java @@ -386,5 +386,4 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand } } - } diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java index a9bb152059..91af8f5106 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java @@ -28,6 +28,7 @@ import org.junit.Test; import org.proton.plug.AMQPConnectionCallback; import org.proton.plug.AMQPConnectionContext; import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.SASLResult; import org.proton.plug.ServerSASL; import org.proton.plug.exceptions.ActiveMQAMQPException; import org.proton.plug.handler.EventHandler; @@ -71,6 +72,10 @@ public class AbstractConnectionContextTest { private class TestConnectionCallback implements AMQPConnectionCallback { + @Override + public void init() throws Exception { + } + @Override public void close() { @@ -110,5 +115,10 @@ public class AbstractConnectionContextTest { public void sendSASLSupported() { } + + @Override + public boolean validateConnection(Connection connection, SASLResult saslResult) { + return true; + } } } diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java index f9b2e9a428..bf83f8a71b 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java @@ -21,10 +21,12 @@ import java.util.concurrent.Executors; import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.qpid.proton.engine.Connection; import org.jboss.logging.Logger; import org.proton.plug.AMQPConnectionContext; import org.proton.plug.AMQPConnectionCallback; import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.SASLResult; import org.proton.plug.ServerSASL; import org.proton.plug.context.server.ProtonServerConnectionContext; import org.proton.plug.sasl.AnonymousServerSASL; @@ -58,6 +60,10 @@ public class ProtonINVMSPI implements AMQPConnectionCallback { }); } + @Override + public void init() throws Exception { + } + @Override public void close() { mainExecutor.shutdown(); @@ -78,6 +84,11 @@ public class ProtonINVMSPI implements AMQPConnectionCallback { } + @Override + public boolean validateConnection(Connection connection, SASLResult saslResult) { + return true; + } + @Override public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) { if (log.isTraceEnabled()) { @@ -125,6 +136,10 @@ public class ProtonINVMSPI implements AMQPConnectionCallback { class ReturnSPI implements AMQPConnectionCallback { + @Override + public void init() throws Exception { + } + @Override public void close() { @@ -145,6 +160,11 @@ public class ProtonINVMSPI implements AMQPConnectionCallback { } + @Override + public boolean validateConnection(Connection connection, SASLResult saslResult) { + return true; + } + @Override public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) { diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java index 17e51c7e5c..fbdee59f03 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java @@ -22,10 +22,12 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import org.apache.qpid.proton.engine.Connection; import org.jboss.logging.Logger; import org.proton.plug.AMQPConnectionContext; import org.proton.plug.AMQPConnectionCallback; import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.SASLResult; import org.proton.plug.ServerSASL; import org.proton.plug.sasl.AnonymousServerSASL; import org.proton.plug.sasl.ServerSASLPlain; @@ -52,6 +54,10 @@ public class AMQPClientSPI implements AMQPConnectionCallback { return connection; } + @Override + public void init() throws Exception { + } + @Override public void close() { @@ -72,6 +78,11 @@ public class AMQPClientSPI implements AMQPConnectionCallback { } + @Override + public boolean validateConnection(Connection connection, SASLResult saslResult) { + return true; + } + final ReusableLatch latch = new ReusableLatch(0); @Override diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java index 02cb06e9b0..055b29deff 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java @@ -25,10 +25,12 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.qpid.proton.engine.Connection; import org.jboss.logging.Logger; import org.proton.plug.AMQPConnectionContext; import org.proton.plug.AMQPConnectionCallback; import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.SASLResult; import org.proton.plug.ServerSASL; import org.proton.plug.sasl.AnonymousServerSASL; import org.proton.plug.sasl.ServerSASLPlain; @@ -48,6 +50,10 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback { ExecutorService executorService = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()); + @Override + public void init() throws Exception { + } + @Override public void close() { executorService.shutdown(); @@ -80,6 +86,11 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback { } + @Override + public boolean validateConnection(Connection connection, SASLResult saslResult) { + return true; + } + @Override public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) { final int bufferSize = bytes.writerIndex(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index b3d9a5f20c..245c6b9275 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -27,6 +27,7 @@ import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.ExceptionListener; +import javax.jms.InvalidClientIDException; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; @@ -1533,6 +1534,35 @@ public class ProtonTest extends ProtonTestBase { connection.close(); } + @Test + public void testClientID() throws Exception { + Connection testConn1 = createConnection(false); + Connection testConn2 = createConnection(false); + try { + testConn1.setClientID("client-id1"); + try { + testConn1.setClientID("client-id2"); + fail("didn't get expected exception"); + } + catch (javax.jms.IllegalStateException e) { + //expected + } + + try { + testConn2.setClientID("client-id1"); + fail("didn't get expected exception"); + } + catch (InvalidClientIDException e) { + //expected + } + } + finally { + testConn1.close(); + testConn2.close(); + } + + } + private javax.jms.Queue createQueue(String address) throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); try { @@ -1543,29 +1573,19 @@ public class ProtonTest extends ProtonTestBase { } } - private javax.jms.Connection createConnection() throws JMSException { + private Connection createConnection() throws JMSException { + return this.createConnection(true); + } + + private javax.jms.Connection createConnection(boolean isStart) throws JMSException { Connection connection; if (protocol == 3) { factory = new JmsConnectionFactory(amqpConnectionUri); connection = factory.createConnection(); - connection.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException exception) { - exception.printStackTrace(); - } - }); - connection.start(); } else if (protocol == 0) { factory = new JmsConnectionFactory(userName, password, amqpConnectionUri); connection = factory.createConnection(); - connection.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException exception) { - exception.printStackTrace(); - } - }); - connection.start(); } else { TransportConfiguration transport; @@ -1579,6 +1599,8 @@ public class ProtonTest extends ProtonTestBase { } connection = factory.createConnection(userName, password); + } + if (isStart) { connection.setExceptionListener(new ExceptionListener() { @Override public void onException(JMSException exception) {