https://issues.apache.org/jira/browse/AMQ-5264 add useSessionArgs attribute to allow transacted session creation - where user does demarcation using plain jms api. Default behaviour is to only allow container local or xa transaction demarcation

This commit is contained in:
gtully 2014-07-22 13:12:58 +01:00
parent b9fd189d56
commit 9e139017e4
6 changed files with 84 additions and 8 deletions

View File

@ -38,6 +38,7 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser
private Boolean useInboundSession;
private RedeliveryPolicy redeliveryPolicy;
private ActiveMQPrefetchPolicy prefetchPolicy;
private Boolean useSessionArgs;
public ActiveMQConnectionRequestInfo copy() {
try {
@ -85,6 +86,9 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser
if (useInboundSession != null) {
rc ^= useInboundSession.hashCode();
}
if (useSessionArgs != null) {
rc ^= useSessionArgs.hashCode();
}
if (serverUrl != null) {
rc ^= serverUrl.hashCode();
}
@ -108,6 +112,9 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser
if (notEqual(useInboundSession, i.useInboundSession)) {
return false;
}
if (notEqual(useSessionArgs, i.useSessionArgs)) {
return false;
}
return true;
}
@ -181,6 +188,7 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser
.append(", serverUrl = '").append(serverUrl).append("' ")
.append(", clientid = '").append(clientid).append("' ")
.append(", userName = '").append(userName).append("' ")
.append(", useSessionArgs = '").append(useSessionArgs).append("' ")
.append(", useInboundSession = '").append(useInboundSession).append("' }")
.toString();
}
@ -334,4 +342,16 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser
}
return prefetchPolicy;
}
public boolean isUseSessionArgs() {
return useSessionArgs != null ? useSessionArgs.booleanValue() : false;
}
public Boolean getUseSessionArgs() {
return useSessionArgs;
}
public void setUseSessionArgs(Boolean useSessionArgs) {
this.useSessionArgs = useSessionArgs;
}
}

View File

@ -103,6 +103,9 @@ public class ActiveMQConnectionSupport {
*/
protected void setInfo(ActiveMQConnectionRequestInfo connectionRequestInfo){
info = connectionRequestInfo;
if ( log.isDebugEnabled() ) {
log.debug(this + ", setting [info] to: " + info);
}
}
protected boolean notEqual(Object o1, Object o2) {
@ -145,7 +148,7 @@ public class ActiveMQConnectionSupport {
*/
public void setClientid(String clientid) {
if ( log.isDebugEnabled() ) {
log.debug("setting [clientid] to: " + clientid);
log.debug(this + ", setting [clientid] to: " + clientid);
}
info.setClientid(clientid);
}
@ -162,7 +165,7 @@ public class ActiveMQConnectionSupport {
*/
public void setPassword(String password) {
if ( log.isDebugEnabled() ) {
log.debug("setting [password] property");
log.debug(this + ", setting [password] property");
}
info.setPassword(password);
}
@ -179,7 +182,7 @@ public class ActiveMQConnectionSupport {
*/
public void setServerUrl(String url) {
if ( log.isDebugEnabled() ) {
log.debug("setting [serverUrl] to: " + url);
log.debug(this + ", setting [serverUrl] to: " + url);
}
info.setServerUrl(url);
}
@ -420,4 +423,28 @@ public class ActiveMQConnectionSupport {
info.setUseInboundSession(useInboundSession);
}
public boolean isUseSessionArgs() {
return info.isUseSessionArgs();
}
public Boolean getUseSessionArgs() {
return info.getUseSessionArgs();
}
/**
* if true, calls to managed connection factory.connection.createSession will
* respect the passed in args. When false (default) the args are ignored b/c
* the container will do transaction demarcation via xa or local transaction rar
* contracts.
* This option is useful when a managed connection is used in plain jms mode
* and a jms transacted session session is required.
* @param useSessionArgs
*/
public void setUseSessionArgs(Boolean useSessionArgs) {
if ( log.isDebugEnabled() ) {
log.debug(this + ", setting [useSessionArgs] to: " + useSessionArgs);
}
info.setUseSessionArgs(useSessionArgs);
}
}

View File

@ -181,7 +181,7 @@ public class ActiveMQManagedConnection implements ManagedConnection, ExceptionLi
* javax.resource.spi.ConnectionRequestInfo)
*/
public Object getConnection(Subject subject, ConnectionRequestInfo info) throws ResourceException {
ManagedConnectionProxy proxy = new ManagedConnectionProxy(this);
ManagedConnectionProxy proxy = new ManagedConnectionProxy(this, info);
proxyConnections.add(proxy);
return proxy;
}

View File

@ -55,7 +55,7 @@ public class ActiveMQManagedConnectionFactory extends ActiveMQConnectionSupport
else
{
if ( log.isDebugEnabled() ) {
log.debug("copying standard ResourceAdapter configuration properties");
log.debug(this + ", copying standard ResourceAdapter configuration properties");
}
ActiveMQConnectionRequestInfo baseInfo = ((MessageResourceAdapter) adapter).getInfo().copy();
@ -71,6 +71,9 @@ public class ActiveMQManagedConnectionFactory extends ActiveMQConnectionSupport
if (getUseInboundSession() == null) {
setUseInboundSession(baseInfo.getUseInboundSession());
}
if (getUseSessionArgs() == null) {
setUseSessionArgs(baseInfo.isUseSessionArgs());
}
if (getUserName() == null) {
setUserName(baseInfo.getUserName());
}

View File

@ -34,6 +34,7 @@ import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.resource.spi.ConnectionRequestInfo;
import org.apache.activemq.ActiveMQQueueSession;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.ActiveMQTopicSession;
@ -49,9 +50,13 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi
private ActiveMQManagedConnection managedConnection;
private final List<ManagedSessionProxy> sessions = new ArrayList<ManagedSessionProxy>();
private ExceptionListener exceptionListener;
private ActiveMQConnectionRequestInfo info;
public ManagedConnectionProxy(ActiveMQManagedConnection managedConnection) {
public ManagedConnectionProxy(ActiveMQManagedConnection managedConnection, ConnectionRequestInfo info) {
this.managedConnection = managedConnection;
if (info instanceof ActiveMQConnectionRequestInfo) {
this.info = (ActiveMQConnectionRequestInfo) info;
}
}
/**
@ -112,7 +117,12 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi
* @throws JMSException on error
*/
private ManagedSessionProxy createSessionProxy(boolean transacted, int acknowledgeMode) throws JMSException {
ActiveMQSession session = (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQSession session;
if (info != null && info.isUseSessionArgs()) {
session = (ActiveMQSession) getConnection().createSession(transacted, transacted ? Session.SESSION_TRANSACTED : acknowledgeMode);
} else {
session = (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
}
ManagedTransactionContext txContext = new ManagedTransactionContext(managedConnection.getTransactionContext());
session.setTransactionContext(txContext);
ManagedSessionProxy p = new ManagedSessionProxy(session, this);

View File

@ -57,6 +57,7 @@ public class ManagedConnectionFactoryTest extends TestCase {
managedConnectionFactory.setServerUrl(DEFAULT_HOST);
managedConnectionFactory.setUserName(ActiveMQConnectionFactory.DEFAULT_USER);
managedConnectionFactory.setPassword(ActiveMQConnectionFactory.DEFAULT_PASSWORD);
managedConnectionFactory.setUseSessionArgs(false);
}
public void testConnectionFactoryAllocation() throws ResourceException, JMSException {
@ -96,6 +97,21 @@ public class ManagedConnectionFactoryTest extends TestCase {
}
public void testConnectionSessionArgs() throws ResourceException, JMSException {
ActiveMQConnectionRequestInfo connectionRequestInfo = new ActiveMQConnectionRequestInfo();
connectionRequestInfo.setServerUrl(DEFAULT_HOST);
connectionRequestInfo.setUserName(ActiveMQConnectionFactory.DEFAULT_USER);
connectionRequestInfo.setPassword(ActiveMQConnectionFactory.DEFAULT_PASSWORD);
connectionRequestInfo.setUseSessionArgs(true);
ManagedConnection managedConnection = managedConnectionFactory.createManagedConnection(null, connectionRequestInfo);
Connection connection = (Connection) managedConnection.getConnection(null, connectionRequestInfo);
Session session = connection.createSession(true, 0);
assertTrue("transacted attribute is respected", session.getTransacted());
connection.close();
}
public void testConnectionFactoryConnectionMatching() throws ResourceException, JMSException {
ActiveMQConnectionRequestInfo ri1 = new ActiveMQConnectionRequestInfo();