git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1350657 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-06-15 15:35:04 +00:00
parent 4e011e0a85
commit e0bb36b4b7
2 changed files with 160 additions and 1 deletions

View File

@ -106,7 +106,9 @@ public class VMTransport implements Transport, Task {
public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) { public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) {
TransportListener transportListener = transport.getTransportListener(); TransportListener transportListener = transport.getTransportListener();
if (transportListener != null) { if (transportListener != null) {
synchronized (started) { // Lock here on the target transport's started since we want to wait for its start()
// method to finish dispatching out of the queue before we do our own.
synchronized (transport.started) {
// Ensure that no additional commands entered the queue in the small time window // Ensure that no additional commands entered the queue in the small time window
// before the start method locks the dispatch lock and the oneway method was in // before the start method locks the dispatch lock and the oneway method was in

View File

@ -32,6 +32,8 @@ import org.apache.activemq.command.BaseCommand;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
@ -107,6 +109,47 @@ public class VMTransportThreadSafeTest {
} }
} }
private class VMResponderTransportListener implements TransportListener {
protected final Queue<DummyCommand> received;
private final Transport peer;
public VMResponderTransportListener(Queue<DummyCommand> receiveQueue, Transport peer) {
this.received = receiveQueue;
this.peer = peer;
}
@Override
public void onCommand(Object command) {
if (command instanceof ShutdownInfo) {
return;
} else {
received.add((DummyCommand) command);
if (peer != null) {
try {
peer.oneway(command);
} catch (IOException e) {
}
}
}
}
@Override
public void onException(IOException error) {
}
@Override
public void transportInterupted() {
}
@Override
public void transportResumed() {
}
}
private class SlowVMTestTransportListener extends VMTestTransportListener { private class SlowVMTestTransportListener extends VMTestTransportListener {
private final TimeUnit delayUnit; private final TimeUnit delayUnit;
@ -714,4 +757,118 @@ public class VMTransportThreadSafeTest {
return endTime - startTime; return endTime - startTime;
} }
@Test(timeout=120000)
public void testTwoWayTrafficWithMutexTransportSync1() throws Exception {
for (int i = 0; i < 20; ++i) {
doTestTwoWayTrafficWithMutexTransport(false, false);
}
}
@Test(timeout=120000)
public void testTwoWayTrafficWithMutexTransportSync2() throws Exception {
for (int i = 0; i < 20; ++i) {
doTestTwoWayTrafficWithMutexTransport(true, false);
}
}
@Test(timeout=120000)
public void testTwoWayTrafficWithMutexTransportSync3() throws Exception {
for (int i = 0; i < 20; ++i) {
doTestTwoWayTrafficWithMutexTransport(false, true);
}
}
@Test(timeout=120000)
public void testTwoWayTrafficWithMutexTransportSync4() throws Exception {
for (int i = 0; i < 20; ++i) {
doTestTwoWayTrafficWithMutexTransport(false, false);
}
}
public void doTestTwoWayTrafficWithMutexTransport(boolean localAsync, boolean remoteAsync) throws Exception {
final VMTransport vmlocal = new VMTransport(new URI(location1));
final VMTransport vmremote = new VMTransport(new URI(location2));
final MutexTransport local = new MutexTransport(vmlocal);
final MutexTransport remote = new MutexTransport(vmremote);
final AtomicInteger sequenceId = new AtomicInteger();
vmlocal.setAsync(localAsync);
vmremote.setAsync(remoteAsync);
vmlocal.setPeer(vmremote);
vmremote.setPeer(vmlocal);
local.setTransportListener(new VMTestTransportListener(localReceived));
remote.setTransportListener(new VMResponderTransportListener(remoteReceived, remote));
final int messageCount = 200000;
Thread localSend = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < messageCount; ++i) {
try {
local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
} catch (Exception e) {
}
}
}
});
Thread remoteSend = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < messageCount; ++i) {
try {
remote.oneway(new DummyCommand(sequenceId.incrementAndGet()));
} catch (Exception e) {
}
}
}
});
localSend.start();
remoteSend.start();
Thread.sleep(10);
local.start();
remote.start();
// Wait for both to finish and then check that each side go the correct amount
localSend.join();
remoteSend.join();
assertTrue("Remote should have received ("+messageCount+") but got ()" + remoteReceived.size(), Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remoteReceived.size() == messageCount;
}
}));
assertTrue("Local should have received ("+messageCount*2+") but got ()" + localReceived.size(), Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return localReceived.size() == messageCount*2;
}
}));
LOG.debug("All messages sent,stop all");
local.stop();
remote.stop();
localReceived.clear();
remoteReceived.clear();
}
} }