git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1182890 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-10-13 14:54:15 +00:00
parent 3b381e7c15
commit f33f32e892
11 changed files with 218 additions and 130 deletions

View File

@ -24,6 +24,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo;
@ -56,6 +57,8 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
private final AtomicBoolean inReceive = new AtomicBoolean(false);
private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
private SchedulerTimerTask writeCheckerTask;
private SchedulerTimerTask readCheckerTask;
@ -140,11 +143,17 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
public void run() {
if (monitorStarted.get()) {
try {
// If we can't get the lock it means another write beat us into the
// send and we don't need to heart beat now.
if (sendLock.writeLock().tryLock()) {
KeepAliveInfo info = new KeepAliveInfo();
info.setResponseRequired(keepAliveResponseRequired);
oneway(info);
doOnewaySend(info);
}
} catch (IOException e) {
onException(e);
} finally {
sendLock.writeLock().unlock();
}
}
};
@ -175,7 +184,6 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
public void run() {
onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
};
});
} else {
if (LOG.isTraceEnabled()) {
@ -195,11 +203,14 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
if (command.getClass() == KeepAliveInfo.class) {
KeepAliveInfo info = (KeepAliveInfo) command;
if (info.isResponseRequired()) {
sendLock.readLock().lock();
try {
info.setResponseRequired(false);
oneway(info);
} catch (IOException e) {
onException(e);
} finally {
sendLock.readLock().unlock();
}
}
} else {
@ -212,39 +223,41 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
}
}
}
synchronized (readChecker) {
transportListener.onCommand(command);
}
}
} finally {
inReceive.set(false);
}
}
public void oneway(Object o) throws IOException {
// Disable inactivity monitoring while processing a command.
// synchronize this method - its not synchronized
// further down the transport stack and gets called by more
// than one thread by this class
synchronized(inSend) {
// To prevent the inactivity monitor from sending a message while we
// are performing a send we take a read lock. The inactivity monitor
// sends its Heart-beat commands under a write lock. This means that
// the MutexTransport is still responsible for synchronizing sends
this.sendLock.readLock().lock();
inSend.set(true);
try {
if( failed.get() ) {
throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
}
if (o.getClass() == WireFormatInfo.class) {
synchronized (this) {
processOutboundWireFormatInfo((WireFormatInfo) o);
}
}
next.oneway(o);
doOnewaySend(o);
} finally {
commandSent.set(true);
inSend.set(false);
this.sendLock.readLock().unlock();
}
}
// Must be called under lock, either read or write on sendLock.
private void doOnewaySend(Object command) throws IOException {
if( failed.get() ) {
throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
}
if (command.getClass() == WireFormatInfo.class) {
synchronized (this) {
processOutboundWireFormatInfo((WireFormatInfo) command);
}
}
next.oneway(command);
}
public void onException(IOException error) {

View File

@ -17,44 +17,90 @@
package org.apache.activemq.transport;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantLock;
/**
*
* Thread safe Transport Filter that serializes calls to and from the Transport Stack.
*/
public class MutexTransport extends TransportFilter {
private final Object writeMutex = new Object();
private final ReentrantLock wreiteLock = new ReentrantLock();
private boolean syncOnCommand;
public MutexTransport(Transport next) {
super(next);
this.syncOnCommand = false;
}
public MutexTransport(Transport next, boolean syncOnCommand) {
super(next);
this.syncOnCommand = syncOnCommand;
}
@Override
public void onCommand(Object command) {
if (syncOnCommand) {
wreiteLock.lock();
try {
transportListener.onCommand(command);
} finally {
wreiteLock.unlock();
}
} else {
transportListener.onCommand(command);
}
}
@Override
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
synchronized (writeMutex) {
wreiteLock.lock();
try {
return next.asyncRequest(command, null);
} finally {
wreiteLock.unlock();
}
}
@Override
public void oneway(Object command) throws IOException {
synchronized (writeMutex) {
wreiteLock.lock();
try {
next.oneway(command);
} finally {
wreiteLock.unlock();
}
}
@Override
public Object request(Object command) throws IOException {
synchronized (writeMutex) {
wreiteLock.lock();
try {
return next.request(command);
} finally {
wreiteLock.unlock();
}
}
@Override
public Object request(Object command, int timeout) throws IOException {
synchronized (writeMutex) {
wreiteLock.lock();
try {
return next.request(command, timeout);
} finally {
wreiteLock.unlock();
}
}
@Override
public String toString() {
return next.toString();
}
public boolean isSyncOnCommand() {
return syncOnCommand;
}
public void setSyncOnCommand(boolean syncOnCommand) {
this.syncOnCommand = syncOnCommand;
}
}

View File

@ -166,16 +166,7 @@ public class ProtocolConverter {
command.setResponseRequired(true);
resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
}
stompTransport.asyncSendToActiveMQ(command);
}
protected void asyncSendToActiveMQ(Command command, ResponseHandler handler) {
command.setCommandId(generateCommandId());
if (handler != null) {
command.setResponseRequired(true);
resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
}
stompTransport.asyncSendToActiveMQ(command);
stompTransport.sendToActiveMQ(command);
}
protected void sendToStomp(StompFrame command) throws IOException {
@ -301,7 +292,7 @@ public class ProtocolConverter {
}
message.onSend();
asyncSendToActiveMQ(message, createResponseHandler(command));
sendToActiveMQ(message, createResponseHandler(command));
}
protected void onStompNack(StompFrame command) throws ProtocolException {
@ -338,7 +329,7 @@ public class ProtocolConverter {
if (sub != null) {
MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
if (ack != null) {
asyncSendToActiveMQ(ack, createResponseHandler(command));
sendToActiveMQ(ack, createResponseHandler(command));
} else {
throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
}
@ -377,7 +368,7 @@ public class ProtocolConverter {
if (sub != null) {
MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
if (ack != null) {
asyncSendToActiveMQ(ack, createResponseHandler(command));
sendToActiveMQ(ack, createResponseHandler(command));
acked = true;
}
}
@ -391,7 +382,7 @@ public class ProtocolConverter {
for (StompSubscription sub : subscriptionsByConsumerId.values()) {
MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
if (ack != null) {
asyncSendToActiveMQ(ack, createResponseHandler(command));
sendToActiveMQ(ack, createResponseHandler(command));
acked = true;
break;
}
@ -426,7 +417,7 @@ public class ProtocolConverter {
tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.BEGIN);
asyncSendToActiveMQ(tx, createResponseHandler(command));
sendToActiveMQ(tx, createResponseHandler(command));
}
protected void onStompCommit(StompFrame command) throws ProtocolException {
@ -453,7 +444,7 @@ public class ProtocolConverter {
tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
asyncSendToActiveMQ(tx, createResponseHandler(command));
sendToActiveMQ(tx, createResponseHandler(command));
}
protected void onStompAbort(StompFrame command) throws ProtocolException {
@ -482,7 +473,7 @@ public class ProtocolConverter {
tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.ROLLBACK);
asyncSendToActiveMQ(tx, createResponseHandler(command));
sendToActiveMQ(tx, createResponseHandler(command));
}
protected void onStompSubscribe(StompFrame command) throws ProtocolException {
@ -550,7 +541,7 @@ public class ProtocolConverter {
// dispatch can beat the receipt so send it early
sendReceipt(command);
asyncSendToActiveMQ(consumerInfo, null);
sendToActiveMQ(consumerInfo, null);
}
protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
@ -579,7 +570,7 @@ public class ProtocolConverter {
info.setClientId(durable);
info.setSubscriptionName(durable);
info.setConnectionId(connectionId);
asyncSendToActiveMQ(info, createResponseHandler(command));
sendToActiveMQ(info, createResponseHandler(command));
return;
}
@ -587,7 +578,7 @@ public class ProtocolConverter {
StompSubscription sub = this.subscriptions.remove(subscriptionId);
if (sub != null) {
asyncSendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
return;
}
@ -598,7 +589,7 @@ public class ProtocolConverter {
for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = iter.next();
if (destination != null && destination.equals(sub.getDestination())) {
asyncSendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
iter.remove();
return;
}
@ -721,8 +712,8 @@ public class ProtocolConverter {
protected void onStompDisconnect(StompFrame command) throws ProtocolException {
checkConnected();
asyncSendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
asyncSendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
connected.set(false);
}
@ -787,7 +778,7 @@ public class ProtocolConverter {
ActiveMQDestination rc = tempDestinations.get(name);
if( rc == null ) {
rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
asyncSendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
tempDestinations.put(name, rc);
}
return rc;
@ -797,7 +788,7 @@ public class ProtocolConverter {
ActiveMQDestination rc = tempDestinations.get(name);
if( rc == null ) {
rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
asyncSendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
tempDestinations.put(name, rc);
tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
}

View File

@ -21,6 +21,7 @@ import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import javax.net.ServerSocketFactory;
@ -29,6 +30,7 @@ import javax.net.SocketFactory;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.nio.NIOTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport;
@ -61,6 +63,19 @@ public class StompNIOTransportFactory extends NIOTransportFactory implements Bro
return new StompNIOTransport(wf, socketFactory, location, localLocation);
}
@SuppressWarnings("rawtypes")
@Override
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
transport = super.serverConfigure(transport, format, options);
MutexTransport mutex = transport.narrow(MutexTransport.class);
if (mutex != null) {
mutex.setSyncOnCommand(true);
}
return transport;
}
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new StompTransportFilter(transport, format, brokerContext);

View File

@ -16,11 +16,13 @@
*/
package org.apache.activemq.transport.stomp;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.SslTransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
@ -46,6 +48,19 @@ public class StompSslTransportFactory extends SslTransportFactory implements Bro
return super.compositeConfigure(transport, format, options);
}
@SuppressWarnings("rawtypes")
@Override
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
transport = super.serverConfigure(transport, format, options);
MutexTransport mutex = transport.narrow(MutexTransport.class);
if (mutex != null) {
mutex.setSyncOnCommand(true);
}
return transport;
}
public void setBrokerService(BrokerService brokerService) {
this.brokerContext = brokerService.getBrokerContext();
}

View File

@ -75,7 +75,7 @@ public class StompSubscription {
}
} else if (ackMode == AUTO_ACK) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
protocolConverter.getStompTransport().asyncSendToActiveMQ(ack);
protocolConverter.getStompTransport().sendToActiveMQ(ack);
}
boolean ignoreTransformation = false;
@ -115,7 +115,7 @@ public class StompSubscription {
if (!unconsumedMessage.isEmpty()) {
MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
protocolConverter.getStompTransport().asyncSendToActiveMQ(ack);
protocolConverter.getStompTransport().sendToActiveMQ(ack);
unconsumedMessage.clear();
}
}

View File

@ -29,8 +29,6 @@ public interface StompTransport {
public void sendToActiveMQ(Command command);
public void asyncSendToActiveMQ(Command command);
public void sendToStomp(StompFrame command) throws IOException;
public X509Certificate[] getPeerCertificates();

View File

@ -16,11 +16,13 @@
*/
package org.apache.activemq.transport.stomp;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
@ -28,8 +30,6 @@ import org.apache.activemq.wireformat.WireFormat;
/**
* A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
*
*
*/
public class StompTransportFactory extends TcpTransportFactory implements BrokerServiceAware {
@ -50,6 +50,19 @@ public class StompTransportFactory extends TcpTransportFactory implements Broker
this.brokerContext = brokerService.getBrokerContext();
}
@SuppressWarnings("rawtypes")
@Override
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
transport = super.serverConfigure(transport, format, options);
MutexTransport mutex = transport.narrow(MutexTransport.class);
if (mutex != null) {
mutex.setSyncOnCommand(true);
}
return transport;
}
@Override
protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
StompInactivityMonitor monitor = new StompInactivityMonitor(transport, format);

View File

@ -18,15 +18,11 @@ package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.Command;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
@ -50,58 +46,18 @@ public class StompTransportFilter extends TransportFilter implements StompTransp
private final ProtocolConverter protocolConverter;
private StompInactivityMonitor monitor;
private StompWireFormat wireFormat;
private final TaskRunner asyncSendTask;
private final ConcurrentLinkedQueue<Command> asyncCommands = new ConcurrentLinkedQueue<Command>();
private boolean trace;
private int maxAsyncBatchSize = 25;
public StompTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
super(next);
this.protocolConverter = new ProtocolConverter(this, brokerContext);
asyncSendTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
public boolean iterate() {
int iterations = 0;
TransportListener listener = transportListener;
if (listener != null) {
while (iterations++ < maxAsyncBatchSize && !asyncCommands.isEmpty()) {
Command command = asyncCommands.poll();
if (command != null) {
listener.onCommand(command);
}
}
}
return !asyncCommands.isEmpty();
}
}, "ActiveMQ StompTransport Async Worker: " + System.identityHashCode(this));
if (wireFormat instanceof StompWireFormat) {
this.wireFormat = (StompWireFormat) wireFormat;
}
}
public void stop() throws Exception {
asyncSendTask.shutdown();
TransportListener listener = transportListener;
if (listener != null) {
Command commands[] = new Command[0];
asyncCommands.toArray(commands);
asyncCommands.clear();
for(Command command : commands) {
try {
listener.onCommand(command);
} catch(Exception e) {
break;
}
}
}
super.stop();
}
public void oneway(Object o) throws IOException {
try {
final Command command = (Command) o;
@ -132,15 +88,6 @@ public class StompTransportFilter extends TransportFilter implements StompTransp
}
}
public void asyncSendToActiveMQ(Command command) {
asyncCommands.offer(command);
try {
asyncSendTask.wakeup();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void sendToStomp(StompFrame command) throws IOException {
if (trace) {
TRACE.trace("Sending: \n" + command);
@ -183,12 +130,4 @@ public class StompTransportFilter extends TransportFilter implements StompTransp
public StompWireFormat getWireFormat() {
return this.wireFormat;
}
public int getMaxAsyncBatchSize() {
return maxAsyncBatchSize;
}
public void setMaxAsyncBatchSize(int maxAsyncBatchSize) {
this.maxAsyncBatchSize = maxAsyncBatchSize;
}
}

View File

@ -22,8 +22,10 @@ import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -1717,6 +1719,67 @@ public class StompTest extends CombinationTestSupport {
assertEquals("Number of clients", expected, actual);
}
public void testDisconnectDoesNotDeadlockBroker() throws Exception {
for (int i = 0; i < 20; ++i) {
doTestConnectionLeak();
}
}
private void doTestConnectionLeak() throws Exception {
stompConnect();
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
boolean gotMessage = false;
boolean gotReceipt = false;
char[] payload = new char[1024];
Arrays.fill(payload, 'A');
String test = "SEND\n" +
"x-type:DEV-3485\n" +
"x-uuid:" + UUID.randomUUID() + "\n" +
"persistent:true\n" +
"receipt:" + UUID.randomUUID() + "\n" +
"destination:/queue/test.DEV-3485" +
"\n\n" +
new String(payload) + Stomp.NULL;
frame = "SUBSCRIBE\n" + "destination:/queue/test.DEV-3485\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
waitForFrameToTakeEffect();
stompConnection.sendFrame(test);
// We only want one of them, to trigger the shutdown and potentially
// see a deadlock.
while (!gotMessage && !gotReceipt) {
frame = stompConnection.receiveFrame();
LOG.debug("Received the frame: " + frame);
if (frame.startsWith("RECEIPT")) {
gotReceipt = true;
} else if(frame.startsWith("MESSAGE")) {
gotMessage = true;
} else {
fail("Received a frame that we were not expecting.");
}
}
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
waitForFrameToTakeEffect();
stompConnection.close();
}
protected void waitForFrameToTakeEffect() throws InterruptedException {
// bit of a dirty hack :)
// another option would be to force some kind of receipt to be returned

View File

@ -101,9 +101,4 @@ class StompSocket extends TransportSupport implements WebSocket, StompTransport
public StompWireFormat getWireFormat() {
return this.wireFormat;
}
@Override
public void asyncSendToActiveMQ(Command command) {
doConsume(command);
}
}