Merging #46 on TM Fixes

This commit is contained in:
Clebert Suconic 2014-12-16 12:18:10 -05:00
commit b1d6c0b449
8 changed files with 181 additions and 22 deletions

View File

@ -101,4 +101,8 @@ public interface ActiveMQJMSBridgeLogger extends BasicLogger
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 344001, value = "Failed to start source connection" , format = Message.Format.MESSAGE_FORMAT)
void jmsBridgeSrcConnectError(@Cause Exception e);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 344002, value = "Failed to start JMS Bridge. QoS Mode: {0} requires a Transaction Manager, none found" , format = Message.Format.MESSAGE_FORMAT)
void jmsBridgeTransactionManagerMissing(QualityOfServiceMode qosMode);
}

View File

@ -66,7 +66,6 @@ import org.apache.activemq.jms.client.ActiveMQMessage;
import org.apache.activemq.jms.server.ActiveMQJMSServerBundle;
import org.apache.activemq.service.extensions.ServiceUtils;
import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistry;
import org.apache.activemq.service.extensions.xa.recovery.ActiveMQRegistryImpl;
import org.apache.activemq.service.extensions.xa.recovery.XARecoveryConfig;
import org.apache.activemq.utils.ClassloadingUtil;
import org.apache.activemq.utils.DefaultSensitiveStringCodec;
@ -402,25 +401,45 @@ public final class JMSBridgeImpl implements JMSBridge
checkParams();
TransactionManager tm = ServiceUtils.getTransactionManager();
// There may already be a JTA transaction associated to the thread
boolean ok;
Transaction toResume = null;
try
// Check to see if the QoSMode requires a TM
if (qualityOfServiceMode.equals(QualityOfServiceMode.AT_MOST_ONCE) ||
qualityOfServiceMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
{
toResume = tm.suspend();
ok = setupJMSObjects();
}
finally
{
if (toResume != null)
if (tm == null)
{
tm.resume(toResume);
tm = ServiceUtils.getTransactionManager();
}
if (tm == null)
{
ActiveMQJMSBridgeLogger.LOGGER.jmsBridgeTransactionManagerMissing(qualityOfServiceMode);
throw new RuntimeException();
}
// There may already be a JTA transaction associated to the thread
Transaction toResume = null;
try
{
toResume = tm.suspend();
ok = setupJMSObjects();
}
finally
{
if (toResume != null)
{
tm.resume(toResume);
}
}
}
else
{
ok = setupJMSObjects();
}
if (ok)
@ -1015,7 +1034,10 @@ public final class JMSBridgeImpl implements JMSBridge
ActiveMQJMSBridgeLogger.LOGGER.trace("Starting JTA transaction");
}
TransactionManager tm = ServiceUtils.getTransactionManager();
if (tm == null)
{
tm = ServiceUtils.getTransactionManager();
}
// Set timeout to a large value since we do not want to time out while waiting for messages
// to arrive - 10 years should be enough
@ -2235,10 +2257,6 @@ public final class JMSBridgeImpl implements JMSBridge
{
registry = sl.iterator().next();
}
else
{
registry = ActiveMQRegistryImpl.getInstance();
}
}
catch (Throwable e)
{

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.service.extensions;
import org.jboss.logging.BasicLogger;
import org.jboss.logging.Logger;
import org.jboss.logging.annotations.LogMessage;
import org.jboss.logging.annotations.Message;
import org.jboss.logging.annotations.MessageLogger;
/**
* @author mtaylor
*
* Logger Code 34
*
* each message id must be 6 digits long starting with 20, the 3rd digit donates the level so
*
* INF0 1
* WARN 2
* DEBUG 3
* ERROR 4
* TRACE 5
* FATAL 6
*
* so an INFO message would be 341000 to 341999
*/
@MessageLogger(projectCode = "AMQ")
public interface ActiveMQServiceExtensionLogger extends BasicLogger
{
/**
* The default logger.
*/
ActiveMQServiceExtensionLogger LOGGER = Logger.getMessageLogger(ActiveMQServiceExtensionLogger.class, ActiveMQServiceExtensionLogger.class.getPackage().getName());
@LogMessage(level = Logger.Level.WARN)
@Message(id = 342000, value = "Attempted to locate a Transaction Manager but none found.", format = Message.Format.MESSAGE_FORMAT)
void transactionManagerNotFound();
}

View File

@ -58,11 +58,19 @@ public class ServiceUtils
if (!transactionManagerLoaded)
{
Iterator<TransactionManagerLocator> it = ServiceLoader.load(TransactionManagerLocator.class).iterator();
if (it.hasNext())
while (it.hasNext() && transactionManager == null)
{
transactionManager = it.next().getTransactionManager();
}
transactionManagerLoaded = true;
if (transactionManager != null)
{
transactionManagerLoaded = true;
}
else
{
ActiveMQServiceExtensionLogger.LOGGER.transactionManagerNotFound();
}
}
return transactionManager;
}

View File

@ -28,9 +28,11 @@ import org.apache.activemq.service.extensions.transactions.TransactionManagerLoc
public class TransactionManagerLocatorImpl implements TransactionManagerLocator
{
public static TransactionManager tm = new TransactionManagerImple();
@Override
public TransactionManager getTransactionManager()
{
return new TransactionManagerImple();
return tm;
}
}

View File

@ -29,6 +29,7 @@ import org.apache.activemq.ra.ActiveMQRAConnectionFactoryImpl;
import org.apache.activemq.ra.ActiveMQRAConnectionManager;
import org.apache.activemq.ra.ActiveMQRAManagedConnectionFactory;
import org.apache.activemq.ra.ActiveMQResourceAdapter;
import org.apache.activemq.tests.integration.jms.bridge.TransactionManagerLocatorImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -63,6 +64,7 @@ public class JMSContextTest extends ActiveMQRATestBase
resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
MyBootstrapContext ctx = new MyBootstrapContext();
TransactionManagerLocatorImpl.tm = DummyTransactionManager.tm;
resourceAdapter.start(ctx);
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
mcf.setResourceAdapter(resourceAdapter);

View File

@ -286,7 +286,6 @@ public class OutgoingConnectionTest extends ActiveMQRATestBase
{
setupDLQ(10);
resourceAdapter = newResourceAdapter();
DummyTransactionManager.tm.tx = new DummyTransaction();
MyBootstrapContext ctx = new MyBootstrapContext();
resourceAdapter.start(ctx);
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();

View File

@ -36,6 +36,7 @@ import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Field;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@ -65,7 +66,9 @@ import org.apache.activemq.tests.util.UnitTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
@ -84,6 +87,9 @@ public class JMSBridgeImplTest extends UnitTestCase
private JMSServerManager jmsServer;
@Rule
public ExpectedException thrown = ExpectedException.none();
// Static --------------------------------------------------------
protected static TransactionManager newTransactionManager()
@ -633,6 +639,72 @@ public class JMSBridgeImplTest extends UnitTestCase
super.tearDown();
}
@Test
public void testTransactionManagerNotSetForDuplicatesOK() throws Exception
{
ConnectionFactoryFactory sourceCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory());
ConnectionFactoryFactory targetCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory());
DestinationFactory sourceDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.SOURCE));
DestinationFactory targetDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.TARGET));
JMSBridgeImpl bridge = new JMSBridgeImpl();
Assert.assertNotNull(bridge);
bridge.setSourceConnectionFactoryFactory(sourceCFF);
bridge.setSourceDestinationFactory(sourceDF);
bridge.setTargetConnectionFactoryFactory(targetCFF);
bridge.setTargetDestinationFactory(targetDF);
bridge.setFailureRetryInterval(10);
bridge.setMaxRetries(1);
bridge.setMaxBatchTime(-1);
bridge.setMaxBatchSize(10);
bridge.setQualityOfServiceMode(QualityOfServiceMode.DUPLICATES_OK);
Assert.assertFalse(bridge.isStarted());
bridge.start();
Field field = JMSBridgeImpl.class.getDeclaredField("tm");
field.setAccessible(true);
assertNull(field.get(bridge));
bridge.stop();
Assert.assertFalse(bridge.isStarted());
}
@Test
public void testThrowErrorWhenTMNotSetForOnceOnly() throws Exception
{
thrown.expect(RuntimeException.class);
ConnectionFactoryFactory sourceCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory());
ConnectionFactoryFactory targetCFF = JMSBridgeImplTest.newConnectionFactoryFactory(JMSBridgeImplTest.createConnectionFactory());
DestinationFactory sourceDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.SOURCE));
DestinationFactory targetDF = JMSBridgeImplTest.newDestinationFactory(ActiveMQJMSClient.createQueue(JMSBridgeImplTest.TARGET));
JMSBridgeImpl bridge = new JMSBridgeImpl();
Assert.assertNotNull(bridge);
bridge.setSourceConnectionFactoryFactory(sourceCFF);
bridge.setSourceDestinationFactory(sourceDF);
bridge.setTargetConnectionFactoryFactory(targetCFF);
bridge.setTargetDestinationFactory(targetDF);
bridge.setFailureRetryInterval(10);
bridge.setMaxRetries(1);
bridge.setMaxBatchTime(-1);
bridge.setMaxBatchSize(10);
bridge.setQualityOfServiceMode(QualityOfServiceMode.ONCE_AND_ONLY_ONCE);
Assert.assertFalse(bridge.isStarted());
bridge.start();
Field field = JMSBridgeImpl.class.getDeclaredField("tm");
field.setAccessible(true);
assertNotNull(field.get(bridge));
bridge.stop();
Assert.assertFalse(bridge.isStarted());
}
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------