mirror of https://github.com/apache/activemq.git
Merge branch 'trunk' of https://git-wip-us.apache.org/repos/asf/activemq into trunk
This commit is contained in:
commit
ae37bb1d59
|
@ -205,6 +205,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
private boolean messagePrioritySupported = true;
|
||||
private boolean transactedIndividualAck = false;
|
||||
private boolean nonBlockingRedelivery = false;
|
||||
private boolean rmIdFromConnectionId = false;
|
||||
|
||||
private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
|
||||
private RejectedExecutionHandler rejectedTaskHandler = null;
|
||||
|
@ -1654,6 +1655,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
* @throws JMSException
|
||||
*/
|
||||
public String getResourceManagerId() throws JMSException {
|
||||
if (isRmIdFromConnectionId()) {
|
||||
return info.getConnectionId().getValue();
|
||||
}
|
||||
waitForBrokerInfo();
|
||||
if (brokerInfo == null) {
|
||||
throw new JMSException("Connection failed before Broker info was received.");
|
||||
|
@ -2590,6 +2594,14 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
this.nonBlockingRedelivery = nonBlockingRedelivery;
|
||||
}
|
||||
|
||||
public boolean isRmIdFromConnectionId() {
|
||||
return rmIdFromConnectionId;
|
||||
}
|
||||
|
||||
public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
|
||||
this.rmIdFromConnectionId = rmIdFromConnectionId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes any TempDestinations that this connection has cached, ignoring
|
||||
* any exceptions generated because the destination is in use as they should
|
||||
|
|
|
@ -179,6 +179,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
private TaskRunnerFactory sessionTaskRunner;
|
||||
private RejectedExecutionHandler rejectedTaskHandler = null;
|
||||
protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class
|
||||
private boolean rmIdFromConnectionId = false;
|
||||
|
||||
// /////////////////////////////////////////////
|
||||
//
|
||||
|
@ -221,7 +222,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
/*boolean*
|
||||
* @param brokerURL
|
||||
* @return
|
||||
* @throws URISyntaxException
|
||||
|
@ -401,6 +402,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
connection.setSessionTaskRunner(getSessionTaskRunner());
|
||||
connection.setRejectedTaskHandler(getRejectedTaskHandler());
|
||||
connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled());
|
||||
connection.setRmIdFromConnectionId(isRmIdFromConnectionId());
|
||||
if (transportListener != null) {
|
||||
connection.addTransportListener(transportListener);
|
||||
}
|
||||
|
@ -821,6 +823,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize()));
|
||||
props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled()));
|
||||
props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod()));
|
||||
props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId()));
|
||||
}
|
||||
|
||||
public boolean isUseCompression() {
|
||||
|
@ -1205,4 +1208,18 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
|
||||
this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
|
||||
}
|
||||
|
||||
|
||||
public boolean isRmIdFromConnectionId() {
|
||||
return rmIdFromConnectionId;
|
||||
}
|
||||
|
||||
/**
|
||||
* uses the connection id as the resource identity for XAResource.isSameRM
|
||||
* ensuring join will only occur on a single connection
|
||||
*/
|
||||
public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
|
||||
this.rmIdFromConnectionId = rmIdFromConnectionId;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -331,6 +331,7 @@
|
|||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>net.alchim31.maven</groupId>
|
||||
|
@ -406,6 +407,36 @@
|
|||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>build-helper-maven-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>add-source</id>
|
||||
<phase>generate-sources</phase>
|
||||
<goals>
|
||||
<goal>add-source</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<sources>
|
||||
<source>${basedir}/src/main/scala</source>
|
||||
</sources>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>add-test-source</id>
|
||||
<phase>generate-test-sources</phase>
|
||||
<goals>
|
||||
<goal>add-test-source</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<sources>
|
||||
<source>${basedir}/src/test/scala</source>
|
||||
</sources>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
|
@ -435,6 +466,22 @@
|
|||
</lifecycleMappingMetadata>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-eclipse-plugin</artifactId>
|
||||
<configuration>
|
||||
<buildcommands>
|
||||
<java.lang.String>org.scala-ide.sdt.core.scalabuilder</java.lang.String>
|
||||
</buildcommands>
|
||||
<projectnatures>
|
||||
<nature>org.scala-ide.sdt.core.scalanature</nature>
|
||||
<nature>org.eclipse.jdt.core.javanature</nature>
|
||||
</projectnatures>
|
||||
<sourceIncludes>
|
||||
<sourceInclude>**/*.scala</sourceInclude>
|
||||
</sourceIncludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
</build>
|
||||
|
|
|
@ -24,11 +24,14 @@ import javax.transaction.xa.XAResource;
|
|||
import javax.transaction.xa.Xid;
|
||||
|
||||
import org.apache.activemq.TransactionContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Used to provide a LocalTransaction and XAResource to a JMS session.
|
||||
*/
|
||||
public class LocalAndXATransaction implements XAResource, LocalTransaction {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(LocalAndXATransaction.class);
|
||||
|
||||
private final TransactionContext transactionContext;
|
||||
private boolean inManagedTx;
|
||||
|
@ -86,6 +89,7 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction {
|
|||
}
|
||||
|
||||
public void end(Xid arg0, int arg1) throws XAException {
|
||||
LOG.debug("{} end {} with {}", new Object[]{this, arg0, arg1});
|
||||
try {
|
||||
transactionContext.end(arg0, arg1);
|
||||
} finally {
|
||||
|
@ -106,14 +110,16 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction {
|
|||
}
|
||||
|
||||
public boolean isSameRM(XAResource xaresource) throws XAException {
|
||||
if (xaresource == null) {
|
||||
return false;
|
||||
boolean isSame = false;
|
||||
if (xaresource != null) {
|
||||
// Do we have to unwrap?
|
||||
if (xaresource instanceof LocalAndXATransaction) {
|
||||
xaresource = ((LocalAndXATransaction)xaresource).transactionContext;
|
||||
}
|
||||
isSame = transactionContext.isSameRM(xaresource);
|
||||
}
|
||||
// Do we have to unwrap?
|
||||
if (xaresource instanceof LocalAndXATransaction) {
|
||||
xaresource = ((LocalAndXATransaction)xaresource).transactionContext;
|
||||
}
|
||||
return transactionContext.isSameRM(xaresource);
|
||||
LOG.trace("{} isSameRM({}) = {}", new Object[]{this, xaresource, isSame});
|
||||
return isSame;
|
||||
}
|
||||
|
||||
public int prepare(Xid arg0) throws XAException {
|
||||
|
@ -133,6 +139,7 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction {
|
|||
}
|
||||
|
||||
public void start(Xid arg0, int arg1) throws XAException {
|
||||
LOG.trace("{} start {} with {}", new Object[]{this, arg0, arg1});
|
||||
transactionContext.start(arg0, arg1);
|
||||
try {
|
||||
setInManagedTx(true);
|
||||
|
|
|
@ -185,6 +185,46 @@ public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport {
|
|||
}
|
||||
}
|
||||
|
||||
public void testIsSameRMOverride() throws URISyntaxException, JMSException, XAException {
|
||||
|
||||
XAConnection connection1 = null;
|
||||
XAConnection connection2 = null;
|
||||
try {
|
||||
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false&jms.rmIdFromConnectionId=true");
|
||||
connection1 = (XAConnection)cf1.createConnection();
|
||||
XASession session1 = connection1.createXASession();
|
||||
XAResource resource1 = session1.getXAResource();
|
||||
|
||||
ActiveMQXAConnectionFactory cf2 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
|
||||
connection2 = (XAConnection)cf2.createConnection();
|
||||
XASession session2 = connection2.createXASession();
|
||||
XAResource resource2 = session2.getXAResource();
|
||||
|
||||
assertFalse(resource1.isSameRM(resource2));
|
||||
|
||||
// ensure identity is preserved
|
||||
XASession session1a = connection1.createXASession();
|
||||
assertTrue(resource1.isSameRM(session1a.getXAResource()));
|
||||
session1.close();
|
||||
session2.close();
|
||||
} finally {
|
||||
if (connection1 != null) {
|
||||
try {
|
||||
connection1.close();
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
if (connection2 != null) {
|
||||
try {
|
||||
connection2.close();
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testVanilaTransactionalProduceReceive() throws Exception {
|
||||
|
||||
XAConnection connection1 = null;
|
||||
|
|
15
pom.xml
15
pom.xml
|
@ -64,7 +64,7 @@
|
|||
<geronimo-version>1.0</geronimo-version>
|
||||
<hadoop-version>1.0.0</hadoop-version>
|
||||
<hawtbuf-version>1.9</hawtbuf-version>
|
||||
<hawtdispatch-version>1.19</hawtdispatch-version>
|
||||
<hawtdispatch-version>1.20</hawtdispatch-version>
|
||||
<howl-version>0.1.8</howl-version>
|
||||
<hsqldb-version>1.8.0.12</hsqldb-version>
|
||||
<httpclient-version>4.2.5</httpclient-version>
|
||||
|
@ -90,7 +90,7 @@
|
|||
<leveldb-version>0.6</leveldb-version>
|
||||
<leveldbjni-version>1.8</leveldbjni-version>
|
||||
<log4j-version>1.2.17</log4j-version>
|
||||
<mqtt-client-version>1.7</mqtt-client-version>
|
||||
<mqtt-client-version>1.8</mqtt-client-version>
|
||||
<openjpa-version>1.2.0</openjpa-version>
|
||||
<org-apache-derby-version>10.10.1.1</org-apache-derby-version>
|
||||
<org.osgi.core-version>4.3.1</org.osgi.core-version>
|
||||
|
@ -124,7 +124,7 @@
|
|||
<xbean-version>3.15</xbean-version>
|
||||
<xerces-version>2.11.0</xerces-version>
|
||||
<jaxb-basics-version>0.6.4</jaxb-basics-version>
|
||||
<stompjms-version>1.18</stompjms-version>
|
||||
<stompjms-version>1.19</stompjms-version>
|
||||
|
||||
<pax-exam-version>2.6.0</pax-exam-version>
|
||||
<paxexam-karaf-container-version>1.0.0</paxexam-karaf-container-version>
|
||||
|
@ -1591,6 +1591,15 @@
|
|||
<releases><enabled>true</enabled></releases>
|
||||
<snapshots><enabled>false</enabled></snapshots>
|
||||
</repository>
|
||||
|
||||
<!-- for the patched proton build: http://is.gd/WjuHvH -->
|
||||
<repository>
|
||||
<id>jboss.org-fs-releases</id>
|
||||
<url>https://repository.jboss.org/nexus/content/repositories/fs-releases/</url>
|
||||
<releases><enabled>true</enabled></releases>
|
||||
<snapshots><enabled>false</enabled></snapshots>
|
||||
</repository>
|
||||
|
||||
</repositories>
|
||||
|
||||
</project>
|
||||
|
|
Loading…
Reference in New Issue