Push enough data through the socket so that the socket close gets detected.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1185210 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-10-17 14:33:41 +00:00
parent 6c4bcb3197
commit 970a97ac5f
1 changed files with 23 additions and 22 deletions

View File

@ -42,10 +42,10 @@ import org.slf4j.LoggerFactory;
public class SoWriteTimeoutTest extends JmsTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(SoWriteTimeoutTest.class);
final int receiveBufferSize = 16*1024;
public String brokerTransportScheme = "nio";
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
broker.addConnector(brokerTransportScheme + "://localhost:0?transport.soWriteTimeout=1000&transport.sleep=1000&socketBufferSize="+ receiveBufferSize);
@ -54,30 +54,30 @@ public class SoWriteTimeoutTest extends JmsTestSupport {
}
return broker;
}
public void initCombosForTestWriteTimeout() {
addCombinationValues("brokerTransportScheme", new Object[]{"tcp", "nio"});
}
public void testWriteTimeout() throws Exception {
Destination dest = new ActiveMQQueue("testWriteTimeout");
messageTextPrefix = initMessagePrefix(8*1024);
sendMessages(dest, 500);
URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
LOG.info("consuming using uri: " + tcpBrokerUri);
SocketProxy proxy = new SocketProxy();
proxy.setTarget(tcpBrokerUri);
proxy.setReceiveBufferSize(receiveBufferSize);
proxy.open();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(proxy.getUrl());
Connection c = factory.createConnection();
c.start();
Session session = c.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(dest);
MessageConsumer consumer = session.createConsumer(dest);
proxy.pause();
// writes should back up... writeTimeout will kick in a abort the connection
TimeUnit.SECONDS.sleep(10);
@ -89,23 +89,24 @@ public class SoWriteTimeoutTest extends JmsTestSupport {
} catch (JMSException expected) {
}
}
public void testWriteTimeoutStompNio() throws Exception {
ActiveMQQueue dest = new ActiveMQQueue("testWriteTimeout");
messageTextPrefix = initMessagePrefix(8*1024);
sendMessages(dest, 500);
URI stompBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(1).getConnectUri());
LOG.info("consuming using uri: " + stompBrokerUri);
SocketProxy proxy = new SocketProxy();
proxy.setTarget(new URI("tcp://localhost:" + stompBrokerUri.getPort()));
proxy.setReceiveBufferSize(receiveBufferSize);
proxy.open();
StompConnection stompConnection = new StompConnection();
stompConnection.open(new Socket("localhost", proxy.getUrl().getPort()));
stompConnection.getStompSocket().setTcpNoDelay(true);
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
@ -113,31 +114,31 @@ public class SoWriteTimeoutTest extends JmsTestSupport {
frame = "SUBSCRIBE\n" + "destination:/queue/" + dest.getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// ensure dispatch has started before pause
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
proxy.pause();
// writes should back up... writeTimeout will kick in a abort the connection
TimeUnit.SECONDS.sleep(1);
// see the blocked threads
//dumpAllThreads("blocked on write");
// abort should be done after this
TimeUnit.SECONDS.sleep(10);
proxy.goOn();
// get a buffered message
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
// verify connection is dead
try {
for (int i=0; i<100; i++) {
for (int i=0; i<200; i++) {
stompConnection.send("/queue/" + dest.getPhysicalName(), "ShouldBeDeadConnectionText" + i);
}
fail("expected send to fail with timeout out connection");
@ -145,7 +146,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport {
LOG.info("got exception on send after timeout: " + expected);
}
}
private String initMessagePrefix(int i) {
byte[] content = new byte[i];
return new String(content);