mirror of https://github.com/apache/activemq.git
Better broker shutdown handling and also better test support.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@577882 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b7b929d90f
commit
01bdc524e7
|
@ -313,36 +313,34 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
|
|||
|
||||
public Response service(Command command) {
|
||||
Response response = null;
|
||||
if (broker.getBrokerService().isStarted()) {
|
||||
boolean responseRequired = command.isResponseRequired();
|
||||
int commandId = command.getCommandId();
|
||||
try {
|
||||
response = command.visit(this);
|
||||
} catch (Throwable e) {
|
||||
if (responseRequired) {
|
||||
if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
|
||||
SERVICELOG.debug("Error occured while processing sync command: " + e, e);
|
||||
}
|
||||
response = new ExceptionResponse(e);
|
||||
} else {
|
||||
serviceException(e);
|
||||
}
|
||||
}
|
||||
boolean responseRequired = command.isResponseRequired();
|
||||
int commandId = command.getCommandId();
|
||||
try {
|
||||
response = command.visit(this);
|
||||
} catch (Throwable e) {
|
||||
if (responseRequired) {
|
||||
if (response == null) {
|
||||
response = new Response();
|
||||
if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
|
||||
SERVICELOG.debug("Error occured while processing sync command: " + e, e);
|
||||
}
|
||||
response.setCorrelationId(commandId);
|
||||
response = new ExceptionResponse(e);
|
||||
} else {
|
||||
serviceException(e);
|
||||
}
|
||||
// The context may have been flagged so that the response is not
|
||||
// sent.
|
||||
if (context != null) {
|
||||
if (context.isDontSendReponse()) {
|
||||
context.setDontSendReponse(false);
|
||||
response = null;
|
||||
}
|
||||
context = null;
|
||||
}
|
||||
if (responseRequired) {
|
||||
if (response == null) {
|
||||
response = new Response();
|
||||
}
|
||||
response.setCorrelationId(commandId);
|
||||
}
|
||||
// The context may have been flagged so that the response is not
|
||||
// sent.
|
||||
if (context != null) {
|
||||
if (context.isDontSendReponse()) {
|
||||
context.setDontSendReponse(false);
|
||||
response = null;
|
||||
}
|
||||
context = null;
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
|
|
@ -124,6 +124,7 @@ public class JmsTestSupport extends CombinationTestSupport {
|
|||
conn.close();
|
||||
} catch (Throwable e) {
|
||||
}
|
||||
iter.remove();
|
||||
}
|
||||
broker.stop();
|
||||
super.tearDown();
|
||||
|
|
|
@ -347,7 +347,7 @@ public class BrokerTestSupport extends CombinationTestSupport {
|
|||
return;
|
||||
}
|
||||
if (o instanceof MessageDispatch && ((MessageDispatch)o).getMessage() != null) {
|
||||
fail("Received a message.");
|
||||
fail("Received a message: "+((MessageDispatch)o).getMessage().getMessageId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.activemq.command.ShutdownInfo;
|
|||
import org.apache.activemq.transport.DefaultTransportListener;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.TransportFactory;
|
||||
import org.apache.activemq.transport.TransportListener;
|
||||
import org.apache.activemq.util.JMSExceptionSupport;
|
||||
import org.apache.activemq.util.ServiceSupport;
|
||||
|
||||
|
@ -38,6 +39,7 @@ public class StubConnection implements Service {
|
|||
private Connection connection;
|
||||
private Transport transport;
|
||||
private boolean shuttingDown;
|
||||
private TransportListener listener;
|
||||
|
||||
public StubConnection(BrokerService broker) throws Exception {
|
||||
this(TransportFactory.connect(broker.getVmConnectorURI()));
|
||||
|
@ -62,6 +64,9 @@ public class StubConnection implements Service {
|
|||
}
|
||||
|
||||
public void onException(IOException error) {
|
||||
if (listener != null) {
|
||||
listener.onException(error);
|
||||
}
|
||||
if (!shuttingDown) {
|
||||
error.printStackTrace();
|
||||
}
|
||||
|
@ -71,6 +76,9 @@ public class StubConnection implements Service {
|
|||
}
|
||||
|
||||
protected void dispatch(Object command) throws InterruptedException, IOException {
|
||||
if (listener != null) {
|
||||
listener.onCommand(command);
|
||||
}
|
||||
dispatchQueue.put(command);
|
||||
}
|
||||
|
||||
|
@ -140,4 +148,12 @@ public class StubConnection implements Service {
|
|||
ServiceSupport.dispose(transport);
|
||||
}
|
||||
}
|
||||
|
||||
public TransportListener getListener() {
|
||||
return listener;
|
||||
}
|
||||
|
||||
public void setListener(TransportListener listener) {
|
||||
this.listener = listener;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,8 @@ import javax.jms.MessageProducer;
|
|||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import junit.framework.AssertionFailedError;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
@ -167,7 +169,15 @@ public abstract class JmsSendReceiveTestSupport extends TestSupport implements M
|
|||
|
||||
for (int i = 0; i < data.length; i++) {
|
||||
Message received = receivedMessages.get(i);
|
||||
assertMessageValid(i, received);
|
||||
try {
|
||||
assertMessageValid(i, received);
|
||||
} catch (AssertionFailedError e) {
|
||||
for (int j = 0; j < data.length; j++) {
|
||||
Message m = receivedMessages.get(j);
|
||||
System.out.println(j+" => "+m.getJMSMessageID());
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue