mirror of https://github.com/apache/activemq.git
resolve AMQ-2004
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@718403 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6d1f57b137
commit
e53668eb23
|
@ -617,7 +617,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
|||
ackCounter = 0;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
} else if (pendingAck != null && pendingAck.isStandardAck()) {
|
||||
ack = pendingAck;
|
||||
}
|
||||
if (ack != null) {
|
||||
|
|
|
@ -304,10 +304,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||
try {
|
||||
response = command.visit(this);
|
||||
} catch (Throwable e) {
|
||||
if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
|
||||
SERVICELOG.debug("Error occured while processing "
|
||||
+ (responseRequired ? "sync": "async")
|
||||
+ " command: " + command + ", exception: " + e, e);
|
||||
}
|
||||
if (responseRequired) {
|
||||
if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
|
||||
SERVICELOG.debug("Error occured while processing sync command: " + e, e);
|
||||
}
|
||||
response = new ExceptionResponse(e);
|
||||
} else {
|
||||
serviceException(e);
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Set;
|
|||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.ExceptionListener;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
|
@ -49,7 +50,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
|
|||
*
|
||||
* @version $Revision: 1.1 $
|
||||
*/
|
||||
public class AMQ1925Test extends TestCase {
|
||||
public class AMQ1925Test extends TestCase implements ExceptionListener {
|
||||
private static final Logger log = Logger.getLogger(AMQ1925Test.class);
|
||||
|
||||
private static final String QUEUE_NAME = "test.amq1925";
|
||||
|
@ -60,6 +61,8 @@ public class AMQ1925Test extends TestCase {
|
|||
private URI tcpUri;
|
||||
private ActiveMQConnectionFactory cf;
|
||||
|
||||
private JMSException exception;
|
||||
|
||||
public void XtestAMQ1925_TXInProgress() throws Exception {
|
||||
Connection connection = cf.createConnection();
|
||||
connection.start();
|
||||
|
@ -255,6 +258,7 @@ public class AMQ1925Test extends TestCase {
|
|||
public void testAMQ1925_TXBegin() throws Exception {
|
||||
Connection connection = cf.createConnection();
|
||||
connection.start();
|
||||
connection.setExceptionListener(this);
|
||||
Session session = connection.createSession(true,
|
||||
Session.SESSION_TRANSACTED);
|
||||
MessageConsumer consumer = session.createConsumer(session
|
||||
|
@ -284,6 +288,7 @@ public class AMQ1925Test extends TestCase {
|
|||
connection.close();
|
||||
|
||||
assertQueueEmpty();
|
||||
assertNull("no exception on connection listener: " + exception, exception);
|
||||
}
|
||||
|
||||
public void testAMQ1925_TXCommited() throws Exception {
|
||||
|
@ -371,6 +376,7 @@ public class AMQ1925Test extends TestCase {
|
|||
}
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
exception = null;
|
||||
bs = new BrokerService();
|
||||
bs.setDeleteAllMessagesOnStartup(true);
|
||||
bs.setPersistent(true);
|
||||
|
@ -388,4 +394,8 @@ public class AMQ1925Test extends TestCase {
|
|||
new ServiceStopper().stop(bs);
|
||||
}
|
||||
|
||||
public void onException(JMSException exception) {
|
||||
this.exception = exception;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue