This closes #214

This commit is contained in:
Clebert Suconic 2015-10-23 10:25:28 -04:00
commit 851d4c35df
2 changed files with 141 additions and 1 deletions

View File

@ -58,6 +58,8 @@ import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.TransactionRolledbackException;
import javax.transaction.xa.XAResource;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
@ -67,6 +69,7 @@ import java.util.Map.Entry;
import java.util.ServiceLoader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
public final class JMSBridgeImpl implements JMSBridge {
@ -176,6 +179,8 @@ public final class JMSBridgeImpl implements JMSBridge {
private ActiveMQRegistry registry;
private ClassLoader moduleTccl;
/*
* Constructor for MBean
*/
@ -319,6 +324,12 @@ public final class JMSBridgeImpl implements JMSBridge {
stopping = false;
}
moduleTccl = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
return Thread.currentThread().getContextClassLoader();
}
});
locateRecoveryRegistry();
if (started) {
@ -1545,7 +1556,22 @@ public final class JMSBridgeImpl implements JMSBridge {
* and 1 for the eventual failureHandler)
*/
private ExecutorService createExecutor() {
return Executors.newFixedThreadPool(3);
ExecutorService service = Executors.newFixedThreadPool(3, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
final Thread thr = new Thread(r);
if (moduleTccl != null) {
AccessController.doPrivileged(new PrivilegedAction() {
public Object run() {
thr.setContextClassLoader(moduleTccl);
return null;
}
});
}
return thr;
}
});
return service;
}
// Inner classes ---------------------------------------------------------------

View File

@ -68,6 +68,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class JMSBridgeImplTest extends ActiveMQTestBase {
@ -83,6 +84,8 @@ public class JMSBridgeImplTest extends ActiveMQTestBase {
private JMSServerManager jmsServer;
private static final AtomicBoolean tcclClassFound = new AtomicBoolean(false);
@Rule
public ExpectedException thrown = ExpectedException.none();
@ -148,6 +151,26 @@ public class JMSBridgeImplTest extends ActiveMQTestBase {
return cf;
}
private static ConnectionFactoryFactory newTCCLAwareConnectionFactoryFactory(final ConnectionFactory cf) {
return new ConnectionFactoryFactory() {
public ConnectionFactory createConnectionFactory() throws Exception {
loadATCCLClass();
return cf;
}
private void loadATCCLClass() {
ClassLoader tcclClassLoader = Thread.currentThread().getContextClassLoader();
try {
tcclClassLoader.loadClass("com.class.only.visible.to.tccl.SomeClass");
tcclClassFound.set(true);
}
catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
};
}
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
@ -546,9 +569,80 @@ public class JMSBridgeImplTest extends ActiveMQTestBase {
}
@Test
public void testStartWithSpecificTCCL() throws Exception {
MockContextClassLoader mockTccl = setMockTCCL();
try {
final AtomicReference<Connection> sourceConn = new AtomicReference<Connection>();
ActiveMQJMSConnectionFactory failingSourceCF = new ActiveMQJMSConnectionFactory(false, new TransportConfiguration(InVMConnectorFactory.class.getName())) {
private static final long serialVersionUID = -8866390811966688830L;
@Override
public Connection createConnection() throws JMSException {
sourceConn.set(super.createConnection());
return sourceConn.get();
}
};
// Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
failingSourceCF.setReconnectAttempts(0);
failingSourceCF.setBlockOnNonDurableSend(true);
failingSourceCF.setBlockOnDurableSend(true);
ConnectionFactoryFactory sourceCFF = JMSBridgeImplTest.newTCCLAwareConnectionFactoryFactory(failingSourceCF);
ConnectionFactoryFactory targetCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory());
DestinationFactory sourceDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.SOURCE));
DestinationFactory targetDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.TARGET));
TransactionManager tm = JMSBridgeImplTest.newTransactionManager();
JMSBridgeImpl bridge = new JMSBridgeImpl();
Assert.assertNotNull(bridge);
bridge.setSourceConnectionFactoryFactory(sourceCFF);
bridge.setSourceDestinationFactory(sourceDF);
bridge.setTargetConnectionFactoryFactory(targetCFF);
bridge.setTargetDestinationFactory(targetDF);
bridge.setFailureRetryInterval(10);
bridge.setMaxRetries(2);
bridge.setMaxBatchSize(1);
bridge.setMaxBatchTime(-1);
bridge.setTransactionManager(tm);
bridge.setQualityOfServiceMode(QualityOfServiceMode.AT_MOST_ONCE);
Assert.assertFalse(bridge.isStarted());
bridge.start();
Assert.assertTrue(bridge.isStarted());
unsetMockTCCL(mockTccl);
tcclClassFound.set(false);
sourceConn.get().getExceptionListener().onException(new JMSException("exception on the source"));
Thread.sleep(4 * bridge.getFailureRetryInterval());
// reconnection must have succeeded
Assert.assertTrue(bridge.isStarted());
bridge.stop();
Assert.assertFalse(bridge.isStarted());
assertTrue(tcclClassFound.get());
}
finally {
if (mockTccl != null) unsetMockTCCL(mockTccl);
}
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
private static MockContextClassLoader setMockTCCL() {
ClassLoader parent = JMSBridgeImpl.class.getClassLoader();
MockContextClassLoader tccl = new MockContextClassLoader(parent);
Thread.currentThread().setContextClassLoader(tccl);
return tccl;
}
private static void unsetMockTCCL(MockContextClassLoader mockTccl) {
Thread.currentThread().setContextClassLoader(mockTccl.getOriginal());
}
@Override
@Before
@ -633,5 +727,25 @@ public class JMSBridgeImplTest extends ActiveMQTestBase {
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
private static class MockContextClassLoader extends ClassLoader {
private final ClassLoader original;
private final String knownClass = "com.class.only.visible.to.tccl.SomeClass";
public MockContextClassLoader(ClassLoader parent) {
super(parent);
original = Thread.currentThread().getContextClassLoader();
}
public ClassLoader getOriginal() {
return original;
}
@Override
protected Class<?> findClass(String name) throws ClassNotFoundException {
if (knownClass.equals(name)) {
return null;
}
return super.findClass(name);
}
}
}