From a58b00872ca5ae2dabf5d15181e61a7f8be35a01 Mon Sep 17 00:00:00 2001 From: gtully Date: Wed, 3 Dec 2014 12:15:10 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5472 - deterministiclally propagate root cause exception to listeners on force close due to security excepition on connect, fix and test --- .../apache/activemq/ActiveMQConnection.java | 16 ++- .../activemq/transport/tcp/TcpTransport.java | 2 +- .../usecases/ExceptionListenerTest.java | 115 +++++++++++++----- 3 files changed, 93 insertions(+), 40 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index 68c8344eae..ed926a11e2 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -1378,10 +1378,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } // dispose of transport for security exceptions on connection initiation if (exception instanceof SecurityException && command instanceof ConnectionInfo){ - Transport t = transport; - if (null != t){ - ServiceSupport.dispose(t); - } + forceCloseOnSecurityException(exception); } if (jmsEx !=null) { onComplete.onException(jmsEx); @@ -1398,6 +1395,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } } + private void forceCloseOnSecurityException(Throwable exception) { + LOG.trace("force close on security exception:" + this + ", transport=" + transport, exception); + onException(new IOException("Force close due to SecurityException on connect", exception)); + } + public Response syncSendPacket(Command command) throws JMSException { if (isClosed()) { throw new ConnectionClosedException(); @@ -1419,12 +1421,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } catch(Throwable e) { LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e); } - //dispose of transport for security exceptions if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){ - Transport t = this.transport; - if (null != t){ - ServiceSupport.dispose(t); - } + forceCloseOnSecurityException(er.getException()); } if (jmsEx !=null) { throw jmsEx; diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index 367a8fccd1..8f515a8b1a 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -200,7 +200,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S onException(e); } catch (Throwable e){ stoppedLatch.get().countDown(); - IOException ioe=new IOException("Unexpected error occured: " + e); + IOException ioe=new IOException("Unexpected error occurred: " + e); ioe.initCause(e); onException(ioe); }finally { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExceptionListenerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExceptionListenerTest.java index 4e6be25e9a..1a2e5ec803 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExceptionListenerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExceptionListenerTest.java @@ -16,48 +16,103 @@ */ package org.apache.activemq.usecases; +import java.net.URI; +import java.util.ArrayList; +import java.util.LinkedList; +import javax.jms.Connection; import javax.jms.ExceptionListener; import javax.jms.JMSException; +import javax.jms.JMSSecurityException; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ConnectionFailedException; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.security.SimpleAuthenticationPlugin; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import junit.framework.TestCase; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * @author Oliver Belikan - * */ -public class ExceptionListenerTest extends TestCase implements ExceptionListener { - boolean isException; +public class ExceptionListenerTest implements ExceptionListener { + private static final Logger LOG = LoggerFactory.getLogger(ExceptionListenerTest.class); + BrokerService brokerService; + URI brokerUri; + LinkedList exceptionsViaListener = new LinkedList(); - public ExceptionListenerTest(String arg) { - super(arg); + @Before + public void startBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.setAdvisorySupport(false); + brokerService.setUseJmx(false); + brokerService.setPersistent(false); + brokerService.setPlugins(new BrokerPlugin[]{new SimpleAuthenticationPlugin(new ArrayList<>())}); + brokerUri = brokerService.addConnector("tcp://0.0.0.0:0").getConnectUri(); + brokerService.start(); } - public void testOnException() throws Exception { - /* - * TODO not sure yet if this is a valid test - * System.setProperty("activemq.persistenceAdapter", - * "org.apache.activemq.store.vm.VMPersistenceAdapter"); // - * configuration of container and all protocolls BrokerContainerImpl - * container = new BrokerContainerImpl("DefaultBroker"); - * BrokerConnectorImpl connector = new BrokerConnectorImpl(container, - * "vm://localhost", new DefaultWireFormat()); container.start(); - * ActiveMQConnectionFactory factory = new - * ActiveMQConnectionFactory("vm://localhost"); factory.start(); - * Connection connection = factory.createConnection(); - * connection.setExceptionListener(this); connection.start(); Session - * session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - * Destination destination = session.createTopic(getClass().getName()); - * MessageProducer producer = session.createProducer(destination); try { - * Thread.currentThread().sleep(1000); } catch (Exception e) { } - * container.stop(); // now lets try send try { - * producer.send(session.createTextMessage("This will never get - * anywhere")); } catch (JMSException e) { log.info("Caught: " + e); } - * try { Thread.currentThread().sleep(1000); } catch (Exception e) { } - * assertTrue("Should have received an exception", isException); - */ + @After + public void stopBroker() throws Exception { + exceptionsViaListener.clear(); + if (brokerService != null) { + brokerService.stop(); + } + } + + @Test + public void fireOnSecurityException() throws Exception { + doFireOnSecurityException(new ActiveMQConnectionFactory(brokerUri)); + } + + @Test + public void fireOnSecurityExceptionFailover() throws Exception { + doFireOnSecurityException(new ActiveMQConnectionFactory("failover://" + brokerUri)); + } + + public void doFireOnSecurityException(ActiveMQConnectionFactory factory) throws Exception { + factory.setWatchTopicAdvisories(false); + Connection connection = factory.createConnection(); + connection.setExceptionListener(this); + + try { + connection.start(); + fail("Expect securityException"); + } catch (JMSSecurityException expected) { + expected.printStackTrace(); + assertTrue("nested security exception: " + expected, expected.getCause() instanceof SecurityException); + } + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return !exceptionsViaListener.isEmpty(); + } + }); + Throwable expected = exceptionsViaListener.getFirst(); + assertNotNull(expected); + assertNotNull(expected.getCause()); + + assertTrue("expected exception: " + expected, expected.getCause().getCause() instanceof SecurityException); + + try { + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + fail("Expect error b/c connection is auto closed on security exception above"); + } catch (ConnectionFailedException e) { + } } public void onException(JMSException e) { - isException = true; + LOG.info("onException:" + e, new Throwable("FromHere")); + exceptionsViaListener.add(e); } }