From e0bb36b4b76f8a4c2a7998314c519aece231147e Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Fri, 15 Jun 2012 15:35:04 +0000 Subject: [PATCH] fix and test for: https://issues.apache.org/jira/browse/AMQ-3873 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1350657 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/transport/vm/VMTransport.java | 4 +- .../vm/VMTransportThreadSafeTest.java | 157 ++++++++++++++++++ 2 files changed, 160 insertions(+), 1 deletion(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index 5a78fd7973..74c44199ed 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -106,7 +106,9 @@ public class VMTransport implements Transport, Task { public void dispatch(VMTransport transport, BlockingQueue pending, Object command) { TransportListener transportListener = transport.getTransportListener(); if (transportListener != null) { - synchronized (started) { + // Lock here on the target transport's started since we want to wait for its start() + // method to finish dispatching out of the queue before we do our own. + synchronized (transport.started) { // Ensure that no additional commands entered the queue in the small time window // before the start method locks the dispatch lock and the oneway method was in diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java index 2dd34282f9..4a1a211489 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java @@ -32,6 +32,8 @@ import org.apache.activemq.command.BaseCommand; import org.apache.activemq.command.Response; import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.state.CommandVisitor; +import org.apache.activemq.transport.MutexTransport; +import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportListener; import org.apache.activemq.util.Wait; @@ -107,6 +109,47 @@ public class VMTransportThreadSafeTest { } } + private class VMResponderTransportListener implements TransportListener { + + protected final Queue received; + + private final Transport peer; + + public VMResponderTransportListener(Queue receiveQueue, Transport peer) { + this.received = receiveQueue; + this.peer = peer; + } + + @Override + public void onCommand(Object command) { + + if (command instanceof ShutdownInfo) { + return; + } else { + received.add((DummyCommand) command); + + if (peer != null) { + try { + peer.oneway(command); + } catch (IOException e) { + } + } + } + } + + @Override + public void onException(IOException error) { + } + + @Override + public void transportInterupted() { + } + + @Override + public void transportResumed() { + } + } + private class SlowVMTestTransportListener extends VMTestTransportListener { private final TimeUnit delayUnit; @@ -714,4 +757,118 @@ public class VMTransportThreadSafeTest { return endTime - startTime; } + @Test(timeout=120000) + public void testTwoWayTrafficWithMutexTransportSync1() throws Exception { + + for (int i = 0; i < 20; ++i) { + doTestTwoWayTrafficWithMutexTransport(false, false); + } + } + + @Test(timeout=120000) + public void testTwoWayTrafficWithMutexTransportSync2() throws Exception { + + for (int i = 0; i < 20; ++i) { + doTestTwoWayTrafficWithMutexTransport(true, false); + } + } + + @Test(timeout=120000) + public void testTwoWayTrafficWithMutexTransportSync3() throws Exception { + + for (int i = 0; i < 20; ++i) { + doTestTwoWayTrafficWithMutexTransport(false, true); + } + } + + @Test(timeout=120000) + public void testTwoWayTrafficWithMutexTransportSync4() throws Exception { + + for (int i = 0; i < 20; ++i) { + doTestTwoWayTrafficWithMutexTransport(false, false); + } + } + + public void doTestTwoWayTrafficWithMutexTransport(boolean localAsync, boolean remoteAsync) throws Exception { + + final VMTransport vmlocal = new VMTransport(new URI(location1)); + final VMTransport vmremote = new VMTransport(new URI(location2)); + + final MutexTransport local = new MutexTransport(vmlocal); + final MutexTransport remote = new MutexTransport(vmremote); + + final AtomicInteger sequenceId = new AtomicInteger(); + + vmlocal.setAsync(localAsync); + vmremote.setAsync(remoteAsync); + + vmlocal.setPeer(vmremote); + vmremote.setPeer(vmlocal); + + local.setTransportListener(new VMTestTransportListener(localReceived)); + remote.setTransportListener(new VMResponderTransportListener(remoteReceived, remote)); + + final int messageCount = 200000; + + Thread localSend = new Thread(new Runnable() { + + @Override + public void run() { + for(int i = 0; i < messageCount; ++i) { + try { + local.oneway(new DummyCommand(sequenceId.incrementAndGet())); + } catch (Exception e) { + } + } + } + }); + + Thread remoteSend = new Thread(new Runnable() { + + @Override + public void run() { + for(int i = 0; i < messageCount; ++i) { + try { + remote.oneway(new DummyCommand(sequenceId.incrementAndGet())); + } catch (Exception e) { + } + } + } + }); + + localSend.start(); + remoteSend.start(); + + Thread.sleep(10); + + local.start(); + remote.start(); + + // Wait for both to finish and then check that each side go the correct amount + localSend.join(); + remoteSend.join(); + + assertTrue("Remote should have received ("+messageCount+") but got ()" + remoteReceived.size(), Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return remoteReceived.size() == messageCount; + } + })); + + assertTrue("Local should have received ("+messageCount*2+") but got ()" + localReceived.size(), Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return localReceived.size() == messageCount*2; + } + })); + + LOG.debug("All messages sent,stop all"); + + local.stop(); + remote.stop(); + + localReceived.clear(); + remoteReceived.clear(); + } + }