mirror of https://github.com/apache/activemq.git
AMQ-2079 improve error messages in AMQConnection.createSession, and make ManagedConnectionProxy only supply consistent parameters
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@735914 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
76b7822cf2
commit
ab438f68c1
|
@ -297,8 +297,13 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
|
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
|
||||||
checkClosedOrFailed();
|
checkClosedOrFailed();
|
||||||
ensureConnectionInfoSent();
|
ensureConnectionInfoSent();
|
||||||
if(!transacted && acknowledgeMode==Session.SESSION_TRANSACTED) {
|
if(!transacted) {
|
||||||
|
if (acknowledgeMode==Session.SESSION_TRANSACTED) {
|
||||||
throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
|
throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
|
||||||
|
} else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
|
||||||
|
throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
|
||||||
|
"Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
|
return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
|
||||||
? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
|
? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
|
||||||
|
|
|
@ -140,6 +140,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
* is called
|
* is called
|
||||||
*/
|
*/
|
||||||
public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
|
public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
|
||||||
|
public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
|
||||||
|
|
||||||
public static interface DeliveryListener {
|
public static interface DeliveryListener {
|
||||||
void beforeDelivery(ActiveMQSession session, Message msg);
|
void beforeDelivery(ActiveMQSession session, Message msg);
|
||||||
|
|
|
@ -85,6 +85,8 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
* @return "physical" underlying activemq connection, if proxy is associated with a managed connection
|
||||||
|
* @throws javax.jms.JMSException if managed connection is null
|
||||||
*/
|
*/
|
||||||
private Connection getConnection() throws JMSException {
|
private Connection getConnection() throws JMSException {
|
||||||
if (managedConnection == null) {
|
if (managedConnection == null) {
|
||||||
|
@ -94,22 +96,26 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param transacted
|
* @param transacted Whether session is transacted
|
||||||
* @param acknowledgeMode
|
* @param acknowledgeMode session acknowledge mode
|
||||||
* @return
|
* @return session proxy
|
||||||
* @throws JMSException
|
* @throws JMSException on error
|
||||||
*/
|
*/
|
||||||
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
|
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
|
||||||
return createSessionProxy(transacted, acknowledgeMode);
|
return createSessionProxy(transacted, acknowledgeMode);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param acknowledgeMode
|
* @param transacted Whether session is transacted
|
||||||
* @param transacted
|
* @param acknowledgeMode session acknowledge mode
|
||||||
* @return
|
* @return session proxy
|
||||||
* @throws JMSException
|
* @throws JMSException on error
|
||||||
*/
|
*/
|
||||||
private ManagedSessionProxy createSessionProxy(boolean transacted, int acknowledgeMode) throws JMSException {
|
private ManagedSessionProxy createSessionProxy(boolean transacted, int acknowledgeMode) throws JMSException {
|
||||||
|
if (!transacted && acknowledgeMode == Session.SESSION_TRANSACTED) {
|
||||||
|
acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
|
||||||
|
}
|
||||||
|
// ActiveMQSession session = (ActiveMQSession)getConnection().createSession(true, acknowledgeMode);
|
||||||
ActiveMQSession session = (ActiveMQSession)getConnection().createSession(transacted, acknowledgeMode);
|
ActiveMQSession session = (ActiveMQSession)getConnection().createSession(transacted, acknowledgeMode);
|
||||||
ManagedTransactionContext txContext = new ManagedTransactionContext(managedConnection.getTransactionContext());
|
ManagedTransactionContext txContext = new ManagedTransactionContext(managedConnection.getTransactionContext());
|
||||||
session.setTransactionContext(txContext);
|
session.setTransactionContext(txContext);
|
||||||
|
@ -120,27 +126,26 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setUseSharedTxContext(boolean enable) throws JMSException {
|
public void setUseSharedTxContext(boolean enable) throws JMSException {
|
||||||
for (Iterator<ManagedSessionProxy> iter = sessions.iterator(); iter.hasNext();) {
|
for (ManagedSessionProxy p : sessions) {
|
||||||
ManagedSessionProxy p = iter.next();
|
|
||||||
p.setUseSharedTxContext(enable);
|
p.setUseSharedTxContext(enable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param transacted
|
* @param transacted Whether session is transacted
|
||||||
* @param acknowledgeMode
|
* @param acknowledgeMode session acknowledge mode
|
||||||
* @return
|
* @return session proxy
|
||||||
* @throws JMSException
|
* @throws JMSException on error
|
||||||
*/
|
*/
|
||||||
public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
|
public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
|
||||||
return new ActiveMQQueueSession(createSessionProxy(transacted, acknowledgeMode));
|
return new ActiveMQQueueSession(createSessionProxy(transacted, acknowledgeMode));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param transacted
|
* @param transacted Whether session is transacted
|
||||||
* @param acknowledgeMode
|
* @param acknowledgeMode session acknowledge mode
|
||||||
* @return
|
* @return session proxy
|
||||||
* @throws JMSException
|
* @throws JMSException on error
|
||||||
*/
|
*/
|
||||||
public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
|
public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
|
||||||
return new ActiveMQTopicSession(createSessionProxy(transacted, acknowledgeMode));
|
return new ActiveMQTopicSession(createSessionProxy(transacted, acknowledgeMode));
|
||||||
|
|
Loading…
Reference in New Issue