https://issues.apache.org/jira/browse/AMQ-5472 - deterministiclally propagate root cause exception to listeners on force close due to security excepition on connect, fix and test

This commit is contained in:
gtully 2014-12-03 12:15:10 +00:00
parent 9edf907aed
commit a58b00872c
3 changed files with 93 additions and 40 deletions

View File

@ -1378,10 +1378,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} }
// dispose of transport for security exceptions on connection initiation // dispose of transport for security exceptions on connection initiation
if (exception instanceof SecurityException && command instanceof ConnectionInfo){ if (exception instanceof SecurityException && command instanceof ConnectionInfo){
Transport t = transport; forceCloseOnSecurityException(exception);
if (null != t){
ServiceSupport.dispose(t);
}
} }
if (jmsEx !=null) { if (jmsEx !=null) {
onComplete.onException(jmsEx); onComplete.onException(jmsEx);
@ -1398,6 +1395,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} }
} }
private void forceCloseOnSecurityException(Throwable exception) {
LOG.trace("force close on security exception:" + this + ", transport=" + transport, exception);
onException(new IOException("Force close due to SecurityException on connect", exception));
}
public Response syncSendPacket(Command command) throws JMSException { public Response syncSendPacket(Command command) throws JMSException {
if (isClosed()) { if (isClosed()) {
throw new ConnectionClosedException(); throw new ConnectionClosedException();
@ -1419,12 +1421,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} catch(Throwable e) { } catch(Throwable e) {
LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e); LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
} }
//dispose of transport for security exceptions
if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){ if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
Transport t = this.transport; forceCloseOnSecurityException(er.getException());
if (null != t){
ServiceSupport.dispose(t);
}
} }
if (jmsEx !=null) { if (jmsEx !=null) {
throw jmsEx; throw jmsEx;

View File

@ -200,7 +200,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
onException(e); onException(e);
} catch (Throwable e){ } catch (Throwable e){
stoppedLatch.get().countDown(); stoppedLatch.get().countDown();
IOException ioe=new IOException("Unexpected error occured: " + e); IOException ioe=new IOException("Unexpected error occurred: " + e);
ioe.initCause(e); ioe.initCause(e);
onException(ioe); onException(ioe);
}finally { }finally {

View File

@ -16,48 +16,103 @@
*/ */
package org.apache.activemq.usecases; package org.apache.activemq.usecases;
import java.net.URI;
import java.util.ArrayList;
import java.util.LinkedList;
import javax.jms.Connection;
import javax.jms.ExceptionListener; import javax.jms.ExceptionListener;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ConnectionFailedException;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import junit.framework.TestCase;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** /**
* @author Oliver Belikan * @author Oliver Belikan
*
*/ */
public class ExceptionListenerTest extends TestCase implements ExceptionListener { public class ExceptionListenerTest implements ExceptionListener {
boolean isException; private static final Logger LOG = LoggerFactory.getLogger(ExceptionListenerTest.class);
BrokerService brokerService;
URI brokerUri;
LinkedList<Throwable> exceptionsViaListener = new LinkedList<Throwable>();
public ExceptionListenerTest(String arg) { @Before
super(arg); public void startBroker() throws Exception {
brokerService = new BrokerService();
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(false);
brokerService.setPersistent(false);
brokerService.setPlugins(new BrokerPlugin[]{new SimpleAuthenticationPlugin(new ArrayList<>())});
brokerUri = brokerService.addConnector("tcp://0.0.0.0:0").getConnectUri();
brokerService.start();
} }
public void testOnException() throws Exception { @After
/* public void stopBroker() throws Exception {
* TODO not sure yet if this is a valid test exceptionsViaListener.clear();
* System.setProperty("activemq.persistenceAdapter", if (brokerService != null) {
* "org.apache.activemq.store.vm.VMPersistenceAdapter"); // brokerService.stop();
* configuration of container and all protocolls BrokerContainerImpl }
* container = new BrokerContainerImpl("DefaultBroker"); }
* BrokerConnectorImpl connector = new BrokerConnectorImpl(container,
* "vm://localhost", new DefaultWireFormat()); container.start(); @Test
* ActiveMQConnectionFactory factory = new public void fireOnSecurityException() throws Exception {
* ActiveMQConnectionFactory("vm://localhost"); factory.start(); doFireOnSecurityException(new ActiveMQConnectionFactory(brokerUri));
* Connection connection = factory.createConnection(); }
* connection.setExceptionListener(this); connection.start(); Session
* session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @Test
* Destination destination = session.createTopic(getClass().getName()); public void fireOnSecurityExceptionFailover() throws Exception {
* MessageProducer producer = session.createProducer(destination); try { doFireOnSecurityException(new ActiveMQConnectionFactory("failover://" + brokerUri));
* Thread.currentThread().sleep(1000); } catch (Exception e) { } }
* container.stop(); // now lets try send try {
* producer.send(session.createTextMessage("This will never get public void doFireOnSecurityException(ActiveMQConnectionFactory factory) throws Exception {
* anywhere")); } catch (JMSException e) { log.info("Caught: " + e); } factory.setWatchTopicAdvisories(false);
* try { Thread.currentThread().sleep(1000); } catch (Exception e) { } Connection connection = factory.createConnection();
* assertTrue("Should have received an exception", isException); connection.setExceptionListener(this);
*/
try {
connection.start();
fail("Expect securityException");
} catch (JMSSecurityException expected) {
expected.printStackTrace();
assertTrue("nested security exception: " + expected, expected.getCause() instanceof SecurityException);
}
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return !exceptionsViaListener.isEmpty();
}
});
Throwable expected = exceptionsViaListener.getFirst();
assertNotNull(expected);
assertNotNull(expected.getCause());
assertTrue("expected exception: " + expected, expected.getCause().getCause() instanceof SecurityException);
try {
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
fail("Expect error b/c connection is auto closed on security exception above");
} catch (ConnectionFailedException e) {
}
} }
public void onException(JMSException e) { public void onException(JMSException e) {
isException = true; LOG.info("onException:" + e, new Throwable("FromHere"));
exceptionsViaListener.add(e);
} }
} }