diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java index 9f3005ec63..f394b3ed1f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java @@ -24,6 +24,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.WireFormatInfo; @@ -56,6 +57,8 @@ public abstract class AbstractInactivityMonitor extends TransportFilter { private final AtomicBoolean inReceive = new AtomicBoolean(false); private final AtomicInteger lastReceiveCounter = new AtomicInteger(0); + private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock(); + private SchedulerTimerTask writeCheckerTask; private SchedulerTimerTask readCheckerTask; @@ -140,11 +143,17 @@ public abstract class AbstractInactivityMonitor extends TransportFilter { public void run() { if (monitorStarted.get()) { try { - KeepAliveInfo info = new KeepAliveInfo(); - info.setResponseRequired(keepAliveResponseRequired); - oneway(info); + // If we can't get the lock it means another write beat us into the + // send and we don't need to heart beat now. + if (sendLock.writeLock().tryLock()) { + KeepAliveInfo info = new KeepAliveInfo(); + info.setResponseRequired(keepAliveResponseRequired); + doOnewaySend(info); + } } catch (IOException e) { onException(e); + } finally { + sendLock.writeLock().unlock(); } } }; @@ -175,7 +184,6 @@ public abstract class AbstractInactivityMonitor extends TransportFilter { public void run() { onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress())); }; - }); } else { if (LOG.isTraceEnabled()) { @@ -195,11 +203,14 @@ public abstract class AbstractInactivityMonitor extends TransportFilter { if (command.getClass() == KeepAliveInfo.class) { KeepAliveInfo info = (KeepAliveInfo) command; if (info.isResponseRequired()) { + sendLock.readLock().lock(); try { info.setResponseRequired(false); oneway(info); } catch (IOException e) { onException(e); + } finally { + sendLock.readLock().unlock(); } } } else { @@ -212,39 +223,41 @@ public abstract class AbstractInactivityMonitor extends TransportFilter { } } } - synchronized (readChecker) { - transportListener.onCommand(command); - } + + transportListener.onCommand(command); } } finally { - inReceive.set(false); } } public void oneway(Object o) throws IOException { - // Disable inactivity monitoring while processing a command. - // synchronize this method - its not synchronized - // further down the transport stack and gets called by more - // than one thread by this class - synchronized(inSend) { - inSend.set(true); - try { + // To prevent the inactivity monitor from sending a message while we + // are performing a send we take a read lock. The inactivity monitor + // sends its Heart-beat commands under a write lock. This means that + // the MutexTransport is still responsible for synchronizing sends + this.sendLock.readLock().lock(); + inSend.set(true); + try { + doOnewaySend(o); + } finally { + commandSent.set(true); + inSend.set(false); + this.sendLock.readLock().unlock(); + } + } - if( failed.get() ) { - throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress()); - } - if (o.getClass() == WireFormatInfo.class) { - synchronized (this) { - processOutboundWireFormatInfo((WireFormatInfo) o); - } - } - next.oneway(o); - } finally { - commandSent.set(true); - inSend.set(false); + // Must be called under lock, either read or write on sendLock. + private void doOnewaySend(Object command) throws IOException { + if( failed.get() ) { + throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress()); + } + if (command.getClass() == WireFormatInfo.class) { + synchronized (this) { + processOutboundWireFormatInfo((WireFormatInfo) command); } } + next.oneway(command); } public void onException(IOException error) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java index 372fcc0c3f..9aa5eeed4d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java @@ -17,44 +17,90 @@ package org.apache.activemq.transport; import java.io.IOException; +import java.util.concurrent.locks.ReentrantLock; /** - * + * Thread safe Transport Filter that serializes calls to and from the Transport Stack. */ public class MutexTransport extends TransportFilter { - private final Object writeMutex = new Object(); + private final ReentrantLock wreiteLock = new ReentrantLock(); + private boolean syncOnCommand; public MutexTransport(Transport next) { super(next); + this.syncOnCommand = false; } + public MutexTransport(Transport next, boolean syncOnCommand) { + super(next); + this.syncOnCommand = syncOnCommand; + } + + @Override + public void onCommand(Object command) { + if (syncOnCommand) { + wreiteLock.lock(); + try { + transportListener.onCommand(command); + } finally { + wreiteLock.unlock(); + } + } else { + transportListener.onCommand(command); + } + } + + @Override public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { - synchronized (writeMutex) { + wreiteLock.lock(); + try { return next.asyncRequest(command, null); + } finally { + wreiteLock.unlock(); } } + @Override public void oneway(Object command) throws IOException { - synchronized (writeMutex) { + wreiteLock.lock(); + try { next.oneway(command); + } finally { + wreiteLock.unlock(); } } + @Override public Object request(Object command) throws IOException { - synchronized (writeMutex) { + wreiteLock.lock(); + try { return next.request(command); + } finally { + wreiteLock.unlock(); } } + @Override public Object request(Object command, int timeout) throws IOException { - synchronized (writeMutex) { + wreiteLock.lock(); + try { return next.request(command, timeout); + } finally { + wreiteLock.unlock(); } } + @Override public String toString() { return next.toString(); } + public boolean isSyncOnCommand() { + return syncOnCommand; + } + + public void setSyncOnCommand(boolean syncOnCommand) { + this.syncOnCommand = syncOnCommand; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index 3d2869518f..4b6255a9b1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -166,16 +166,7 @@ public class ProtocolConverter { command.setResponseRequired(true); resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); } - stompTransport.asyncSendToActiveMQ(command); - } - - protected void asyncSendToActiveMQ(Command command, ResponseHandler handler) { - command.setCommandId(generateCommandId()); - if (handler != null) { - command.setResponseRequired(true); - resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); - } - stompTransport.asyncSendToActiveMQ(command); + stompTransport.sendToActiveMQ(command); } protected void sendToStomp(StompFrame command) throws IOException { @@ -301,7 +292,7 @@ public class ProtocolConverter { } message.onSend(); - asyncSendToActiveMQ(message, createResponseHandler(command)); + sendToActiveMQ(message, createResponseHandler(command)); } protected void onStompNack(StompFrame command) throws ProtocolException { @@ -338,7 +329,7 @@ public class ProtocolConverter { if (sub != null) { MessageAck ack = sub.onStompMessageNack(messageId, activemqTx); if (ack != null) { - asyncSendToActiveMQ(ack, createResponseHandler(command)); + sendToActiveMQ(ack, createResponseHandler(command)); } else { throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]"); } @@ -377,7 +368,7 @@ public class ProtocolConverter { if (sub != null) { MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); if (ack != null) { - asyncSendToActiveMQ(ack, createResponseHandler(command)); + sendToActiveMQ(ack, createResponseHandler(command)); acked = true; } } @@ -391,7 +382,7 @@ public class ProtocolConverter { for (StompSubscription sub : subscriptionsByConsumerId.values()) { MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); if (ack != null) { - asyncSendToActiveMQ(ack, createResponseHandler(command)); + sendToActiveMQ(ack, createResponseHandler(command)); acked = true; break; } @@ -426,7 +417,7 @@ public class ProtocolConverter { tx.setTransactionId(activemqTx); tx.setType(TransactionInfo.BEGIN); - asyncSendToActiveMQ(tx, createResponseHandler(command)); + sendToActiveMQ(tx, createResponseHandler(command)); } protected void onStompCommit(StompFrame command) throws ProtocolException { @@ -453,7 +444,7 @@ public class ProtocolConverter { tx.setTransactionId(activemqTx); tx.setType(TransactionInfo.COMMIT_ONE_PHASE); - asyncSendToActiveMQ(tx, createResponseHandler(command)); + sendToActiveMQ(tx, createResponseHandler(command)); } protected void onStompAbort(StompFrame command) throws ProtocolException { @@ -482,7 +473,7 @@ public class ProtocolConverter { tx.setTransactionId(activemqTx); tx.setType(TransactionInfo.ROLLBACK); - asyncSendToActiveMQ(tx, createResponseHandler(command)); + sendToActiveMQ(tx, createResponseHandler(command)); } protected void onStompSubscribe(StompFrame command) throws ProtocolException { @@ -550,7 +541,7 @@ public class ProtocolConverter { // dispatch can beat the receipt so send it early sendReceipt(command); - asyncSendToActiveMQ(consumerInfo, null); + sendToActiveMQ(consumerInfo, null); } protected void onStompUnsubscribe(StompFrame command) throws ProtocolException { @@ -579,7 +570,7 @@ public class ProtocolConverter { info.setClientId(durable); info.setSubscriptionName(durable); info.setConnectionId(connectionId); - asyncSendToActiveMQ(info, createResponseHandler(command)); + sendToActiveMQ(info, createResponseHandler(command)); return; } @@ -587,7 +578,7 @@ public class ProtocolConverter { StompSubscription sub = this.subscriptions.remove(subscriptionId); if (sub != null) { - asyncSendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); + sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); return; } @@ -598,7 +589,7 @@ public class ProtocolConverter { for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { StompSubscription sub = iter.next(); if (destination != null && destination.equals(sub.getDestination())) { - asyncSendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); + sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); iter.remove(); return; } @@ -721,8 +712,8 @@ public class ProtocolConverter { protected void onStompDisconnect(StompFrame command) throws ProtocolException { checkConnected(); - asyncSendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command)); - asyncSendToActiveMQ(new ShutdownInfo(), createResponseHandler(command)); + sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command)); + sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command)); connected.set(false); } @@ -787,7 +778,7 @@ public class ProtocolConverter { ActiveMQDestination rc = tempDestinations.get(name); if( rc == null ) { rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId()); - asyncSendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); + sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); tempDestinations.put(name, rc); } return rc; @@ -797,7 +788,7 @@ public class ProtocolConverter { ActiveMQDestination rc = tempDestinations.get(name); if( rc == null ) { rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId()); - asyncSendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); + sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); tempDestinations.put(name, rc); tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java index 42c8cccfbc..bc1fcfeeda 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java @@ -21,6 +21,7 @@ import java.net.Socket; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.util.HashMap; import java.util.Map; import javax.net.ServerSocketFactory; @@ -29,6 +30,7 @@ import javax.net.SocketFactory; import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.transport.MutexTransport; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.nio.NIOTransportFactory; import org.apache.activemq.transport.tcp.TcpTransport; @@ -61,6 +63,19 @@ public class StompNIOTransportFactory extends NIOTransportFactory implements Bro return new StompNIOTransport(wf, socketFactory, location, localLocation); } + @SuppressWarnings("rawtypes") + @Override + public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception { + transport = super.serverConfigure(transport, format, options); + + MutexTransport mutex = transport.narrow(MutexTransport.class); + if (mutex != null) { + mutex.setSyncOnCommand(true); + } + + return transport; + } + @SuppressWarnings("rawtypes") public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { transport = new StompTransportFilter(transport, format, brokerContext); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java index 33d27b47cc..8a3792899b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java @@ -16,11 +16,13 @@ */ package org.apache.activemq.transport.stomp; +import java.util.HashMap; import java.util.Map; import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.transport.MutexTransport; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.tcp.SslTransportFactory; import org.apache.activemq.util.IntrospectionSupport; @@ -46,6 +48,19 @@ public class StompSslTransportFactory extends SslTransportFactory implements Bro return super.compositeConfigure(transport, format, options); } + @SuppressWarnings("rawtypes") + @Override + public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception { + transport = super.serverConfigure(transport, format, options); + + MutexTransport mutex = transport.narrow(MutexTransport.class); + if (mutex != null) { + mutex.setSyncOnCommand(true); + } + + return transport; + } + public void setBrokerService(BrokerService brokerService) { this.brokerContext = brokerService.getBrokerContext(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index d2a7905767..6389f97cfe 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -75,7 +75,7 @@ public class StompSubscription { } } else if (ackMode == AUTO_ACK) { MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); - protocolConverter.getStompTransport().asyncSendToActiveMQ(ack); + protocolConverter.getStompTransport().sendToActiveMQ(ack); } boolean ignoreTransformation = false; @@ -115,7 +115,7 @@ public class StompSubscription { if (!unconsumedMessage.isEmpty()) { MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size()); - protocolConverter.getStompTransport().asyncSendToActiveMQ(ack); + protocolConverter.getStompTransport().sendToActiveMQ(ack); unconsumedMessage.clear(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java index 8e875e0dc3..ef5b6673e6 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java @@ -29,8 +29,6 @@ public interface StompTransport { public void sendToActiveMQ(Command command); - public void asyncSendToActiveMQ(Command command); - public void sendToStomp(StompFrame command) throws IOException; public X509Certificate[] getPeerCertificates(); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java index b561c1bcb7..4fef5d3e05 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java @@ -16,11 +16,13 @@ */ package org.apache.activemq.transport.stomp; +import java.util.HashMap; import java.util.Map; import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.transport.MutexTransport; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.apache.activemq.util.IntrospectionSupport; @@ -28,8 +30,6 @@ import org.apache.activemq.wireformat.WireFormat; /** * A STOMP transport factory - * - * */ public class StompTransportFactory extends TcpTransportFactory implements BrokerServiceAware { @@ -50,6 +50,19 @@ public class StompTransportFactory extends TcpTransportFactory implements Broker this.brokerContext = brokerService.getBrokerContext(); } + @SuppressWarnings("rawtypes") + @Override + public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception { + transport = super.serverConfigure(transport, format, options); + + MutexTransport mutex = transport.narrow(MutexTransport.class); + if (mutex != null) { + mutex.setSyncOnCommand(true); + } + + return transport; + } + @Override protected Transport createInactivityMonitor(Transport transport, WireFormat format) { StompInactivityMonitor monitor = new StompInactivityMonitor(transport, format); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java index 03be046dee..8b38e4c2ed 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java @@ -18,15 +18,11 @@ package org.apache.activemq.transport.stomp; import java.io.IOException; import java.security.cert.X509Certificate; -import java.util.concurrent.ConcurrentLinkedQueue; import javax.jms.JMSException; import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.command.Command; -import org.apache.activemq.thread.DefaultThreadPools; -import org.apache.activemq.thread.Task; -import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.transport.TransportListener; @@ -50,58 +46,18 @@ public class StompTransportFilter extends TransportFilter implements StompTransp private final ProtocolConverter protocolConverter; private StompInactivityMonitor monitor; private StompWireFormat wireFormat; - private final TaskRunner asyncSendTask; - private final ConcurrentLinkedQueue asyncCommands = new ConcurrentLinkedQueue(); private boolean trace; - private int maxAsyncBatchSize = 25; public StompTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) { super(next); this.protocolConverter = new ProtocolConverter(this, brokerContext); - asyncSendTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() { - public boolean iterate() { - int iterations = 0; - TransportListener listener = transportListener; - if (listener != null) { - while (iterations++ < maxAsyncBatchSize && !asyncCommands.isEmpty()) { - Command command = asyncCommands.poll(); - if (command != null) { - listener.onCommand(command); - } - } - } - return !asyncCommands.isEmpty(); - } - - }, "ActiveMQ StompTransport Async Worker: " + System.identityHashCode(this)); - if (wireFormat instanceof StompWireFormat) { this.wireFormat = (StompWireFormat) wireFormat; } } - public void stop() throws Exception { - asyncSendTask.shutdown(); - - TransportListener listener = transportListener; - if (listener != null) { - Command commands[] = new Command[0]; - asyncCommands.toArray(commands); - asyncCommands.clear(); - for(Command command : commands) { - try { - listener.onCommand(command); - } catch(Exception e) { - break; - } - } - } - - super.stop(); - } - public void oneway(Object o) throws IOException { try { final Command command = (Command) o; @@ -132,15 +88,6 @@ public class StompTransportFilter extends TransportFilter implements StompTransp } } - public void asyncSendToActiveMQ(Command command) { - asyncCommands.offer(command); - try { - asyncSendTask.wakeup(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - public void sendToStomp(StompFrame command) throws IOException { if (trace) { TRACE.trace("Sending: \n" + command); @@ -183,12 +130,4 @@ public class StompTransportFilter extends TransportFilter implements StompTransp public StompWireFormat getWireFormat() { return this.wireFormat; } - - public int getMaxAsyncBatchSize() { - return maxAsyncBatchSize; - } - - public void setMaxAsyncBatchSize(int maxAsyncBatchSize) { - this.maxAsyncBatchSize = maxAsyncBatchSize; - } } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index 7bd963d242..aea5dd9f67 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -22,8 +22,10 @@ import java.net.SocketTimeoutException; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -1717,6 +1719,67 @@ public class StompTest extends CombinationTestSupport { assertEquals("Number of clients", expected, actual); } + public void testDisconnectDoesNotDeadlockBroker() throws Exception { + for (int i = 0; i < 20; ++i) { + doTestConnectionLeak(); + } + } + + private void doTestConnectionLeak() throws Exception { + stompConnect(); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + boolean gotMessage = false; + boolean gotReceipt = false; + + char[] payload = new char[1024]; + Arrays.fill(payload, 'A'); + + String test = "SEND\n" + + "x-type:DEV-3485\n" + + "x-uuid:" + UUID.randomUUID() + "\n" + + "persistent:true\n" + + "receipt:" + UUID.randomUUID() + "\n" + + "destination:/queue/test.DEV-3485" + + "\n\n" + + new String(payload) + Stomp.NULL; + + frame = "SUBSCRIBE\n" + "destination:/queue/test.DEV-3485\n" + "ack:auto\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + waitForFrameToTakeEffect(); + + stompConnection.sendFrame(test); + + // We only want one of them, to trigger the shutdown and potentially + // see a deadlock. + while (!gotMessage && !gotReceipt) { + frame = stompConnection.receiveFrame(); + + LOG.debug("Received the frame: " + frame); + + if (frame.startsWith("RECEIPT")) { + gotReceipt = true; + } else if(frame.startsWith("MESSAGE")) { + gotMessage = true; + } else { + fail("Received a frame that we were not expecting."); + } + } + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + waitForFrameToTakeEffect(); + + stompConnection.close(); + } + protected void waitForFrameToTakeEffect() throws InterruptedException { // bit of a dirty hack :) // another option would be to force some kind of receipt to be returned diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java b/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java index e6b94ca1d0..94d4f7eb7a 100644 --- a/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java +++ b/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java @@ -101,9 +101,4 @@ class StompSocket extends TransportSupport implements WebSocket, StompTransport public StompWireFormat getWireFormat() { return this.wireFormat; } - - @Override - public void asyncSendToActiveMQ(Command command) { - doConsume(command); - } }