From ae61847d025e76ba23876e1ba3f05f3d26f8a741 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Sat, 12 Jan 2013 18:13:27 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4248 Add expanded transmit callback interface so that a failure to transmit can be distinguished from normal operation and allow for no further attempts at dispatch fixing the current NPE when async dispatch is enabled. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1432487 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/TransportConnection.java | 124 ++++++++++++++++-- .../broker/region/PrefetchSubscription.java | 47 +++++-- .../broker/region/TopicSubscription.java | 30 ++++- .../cursors/StoreDurableSubscriberCursor.java | 27 ++-- .../activemq/command/MessageDispatch.java | 14 +- .../activemq/transport/TransmitCallback.java | 25 ++++ 6 files changed, 224 insertions(+), 43 deletions(-) create mode 100644 activemq-client/src/main/java/org/apache/activemq/transport/TransmitCallback.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 68a8e08147..45538b4bdf 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -36,10 +36,44 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.transaction.xa.XAResource; + import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.region.ConnectionStatistics; import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.command.*; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.CommandTypes; +import org.apache.activemq.command.ConnectionControl; +import org.apache.activemq.command.ConnectionError; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerControl; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.ControlCommand; +import org.apache.activemq.command.DataArrayResponse; +import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.command.FlushCommand; +import org.apache.activemq.command.IntegerResponse; +import org.apache.activemq.command.KeepAliveInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.ProducerAck; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.command.SessionId; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.TransactionInfo; +import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.network.DemandForwardingBridge; import org.apache.activemq.network.MBeanNetworkListener; import org.apache.activemq.network.NetworkBridgeConfiguration; @@ -57,12 +91,12 @@ import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transaction.Transaction; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.ResponseCorrelator; +import org.apache.activemq.transport.TransmitCallback; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.MarshallingSupport; -import org.apache.activemq.util.ServiceSupport; import org.apache.activemq.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,6 +208,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { * * @return size of dispatch queue */ + @Override public int getDispatchQueueSize() { synchronized (dispatchQueue) { return dispatchQueue.size(); @@ -207,8 +242,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } private boolean expected(IOException e) { - return isStomp() && - ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException); + return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException); } private boolean isStomp() { @@ -221,6 +255,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { * service exception closes a socket, we should not tie up broker threads * since client sockets may hang or cause deadlocks. */ + @Override public void serviceExceptionAsync(final IOException e) { if (asyncException.compareAndSet(false, true)) { new Thread("Async Exception Handler") { @@ -237,6 +272,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { * if: the client is closing or broker is closing. Otherwise, the connection * error transmitted to the client before stopping it's transport. */ + @Override public void serviceException(Throwable e) { // are we a transport exception such as not being able to dispatch // synchronously to a transport @@ -282,6 +318,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } } + @Override public Response service(Command command) { MDC.put("activemq.connector", connector.getUri().toString()); Response response = null; @@ -324,30 +361,36 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return response; } + @Override public Response processKeepAlive(KeepAliveInfo info) throws Exception { return null; } + @Override public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info); return null; } + @Override public Response processWireFormat(WireFormatInfo info) throws Exception { wireFormatInfo = info; protocolVersion.set(info.getVersion()); return null; } + @Override public Response processShutdown(ShutdownInfo info) throws Exception { stopAsync(); return null; } + @Override public Response processFlush(FlushCommand command) throws Exception { return null; } + @Override public Response processBeginTransaction(TransactionInfo info) throws Exception { TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); context = null; @@ -365,6 +408,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processEndTransaction(TransactionInfo info) throws Exception { // No need to do anything. This packet is just sent by the client // make sure he is synced with the server as commit command could @@ -372,6 +416,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processPrepareTransaction(TransactionInfo info) throws Exception { TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); context = null; @@ -403,6 +448,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } } + @Override public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); context = cs.getContext(); @@ -411,6 +457,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); context = cs.getContext(); @@ -419,6 +466,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processRollbackTransaction(TransactionInfo info) throws Exception { TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); context = cs.getContext(); @@ -427,6 +475,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processForgetTransaction(TransactionInfo info) throws Exception { TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); context = cs.getContext(); @@ -434,6 +483,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processRecoverTransactions(TransactionInfo info) throws Exception { TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); context = cs.getContext(); @@ -441,6 +491,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return new DataArrayResponse(preparedTransactions); } + @Override public Response processMessage(Message messageSend) throws Exception { ProducerId producerId = messageSend.getProducerId(); ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); @@ -450,6 +501,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processMessageAck(MessageAck ack) throws Exception { ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId()); if (consumerExchange != null) { @@ -458,15 +510,18 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processMessagePull(MessagePull pull) throws Exception { return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull); } + @Override public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception { broker.processDispatchNotification(notification); return null; } + @Override public Response processAddDestination(DestinationInfo info) throws Exception { TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); broker.addDestinationInfo(cs.getContext(), info); @@ -476,6 +531,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processRemoveDestination(DestinationInfo info) throws Exception { TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); broker.removeDestinationInfo(cs.getContext(), info); @@ -485,6 +541,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processAddProducer(ProducerInfo info) throws Exception { SessionId sessionId = info.getProducerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); @@ -517,6 +574,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processRemoveProducer(ProducerId id) throws Exception { SessionId sessionId = id.getParentId(); ConnectionId connectionId = sessionId.getParentId(); @@ -535,6 +593,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processAddConsumer(ConsumerInfo info) throws Exception { SessionId sessionId = info.getConsumerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); @@ -569,6 +628,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception { SessionId sessionId = id.getParentId(); ConnectionId connectionId = sessionId.getParentId(); @@ -593,6 +653,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processAddSession(SessionInfo info) throws Exception { ConnectionId connectionId = info.getSessionId().getParentId(); TransportConnectionState cs = lookupConnectionState(connectionId); @@ -609,6 +670,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception { ConnectionId connectionId = id.getParentId(); TransportConnectionState cs = lookupConnectionState(connectionId); @@ -642,6 +704,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processAddConnection(ConnectionInfo info) throws Exception { // Older clients should have been defaulting this field to true.. but // they were not. @@ -728,6 +791,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws InterruptedException { LOG.debug("remove connection id: " + id); @@ -776,15 +840,18 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processProducerAck(ProducerAck ack) throws Exception { // A broker should not get ProducerAck messages. return null; } + @Override public Connector getConnector() { return connector; } + @Override public void dispatchSync(Command message) { try { processDispatch(message); @@ -793,6 +860,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } } + @Override public void dispatchAsync(Command message) { if (!stopping.get()) { if (taskRunner == null) { @@ -810,17 +878,17 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } else { if (message.isMessageDispatch()) { MessageDispatch md = (MessageDispatch) message; - Runnable sub = md.getTransmitCallback(); + TransmitCallback sub = md.getTransmitCallback(); broker.postProcessDispatch(md); if (sub != null) { - sub.run(); + sub.onFailure(); } } } } protected void processDispatch(Command command) throws IOException { - final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null); + MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null); try { if (!stopping.get()) { if (messageDispatch != null) { @@ -828,17 +896,27 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } dispatch(command); } - } finally { + } catch (Throwable e) { if (messageDispatch != null) { - Runnable sub = messageDispatch.getTransmitCallback(); + TransmitCallback sub = messageDispatch.getTransmitCallback(); broker.postProcessDispatch(messageDispatch); if (sub != null) { - sub.run(); + sub.onFailure(); + } + messageDispatch = null; + } + } finally { + if (messageDispatch != null) { + TransmitCallback sub = messageDispatch.getTransmitCallback(); + broker.postProcessDispatch(messageDispatch); + if (sub != null) { + sub.onSuccess(); } } } } + @Override public boolean iterate() { try { if (pendingStop || stopping.get()) { @@ -877,6 +955,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { /** * Returns the statistics for this connection */ + @Override public ConnectionStatistics getStatistics() { return statistics; } @@ -889,10 +968,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor { this.messageAuthorizationPolicy = messageAuthorizationPolicy; } + @Override public boolean isManageable() { return manageable; } + @Override public void start() throws Exception { try { synchronized (this) { @@ -931,6 +1012,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } } + @Override public void stop() throws Exception { // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory) // as their lifecycle is handled elsewhere @@ -949,6 +1031,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } try { stopTaskRunnerFactory.execute(new Runnable() { + @Override public void run() { try { Thread.sleep(waitTime); @@ -985,6 +1068,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } try { stopTaskRunnerFactory.execute(new Runnable() { + @Override public void run() { serviceLock.writeLock().lock(); try { @@ -1039,10 +1123,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor { Command command = iter.next(); if (command.isMessageDispatch()) { MessageDispatch md = (MessageDispatch) command; - Runnable sub = md.getTransmitCallback(); + TransmitCallback sub = md.getTransmitCallback(); broker.postProcessDispatch(md); if (sub != null) { - sub.run(); + sub.onFailure(); } } } @@ -1109,6 +1193,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { /** * @return true if the Connection is slow */ + @Override public boolean isSlow() { return slow; } @@ -1132,6 +1217,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { /** * @return if after being marked, the Connection is still writing */ + @Override public boolean isBlocked() { return blocked; } @@ -1139,6 +1225,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { /** * @return true if the Connection is connected */ + @Override public boolean isConnected() { return connected; } @@ -1160,6 +1247,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { /** * @return true if the Connection is active */ + @Override public boolean isActive() { return active; } @@ -1178,10 +1266,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return starting; } + @Override public synchronized boolean isNetworkConnection() { return networkConnection; } + @Override public boolean isFaultTolerantConnection() { return this.faultTolerantConnection; } @@ -1201,9 +1291,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { this.pendingStop = pendingStop; } + @Override public Response processBrokerInfo(BrokerInfo info) { if (info.isSlaveBroker()) { - BrokerService bService = connector.getBrokerService(); LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: " + info.getBrokerName()); } else if (info.isNetworkConnection() && info.isDuplexConnection()) { // so this TransportConnection is the rear end of a network bridge @@ -1291,10 +1381,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } } + @Override public String getRemoteAddress() { return transport.getRemoteAddress(); } + @Override public String getConnectionId() { List connectionStates = listConnectionStates(); for (TransportConnectionState cs : connectionStates) { @@ -1306,6 +1398,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public void updateClient(ConnectionControl control) { if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null && this.wireFormatInfo.getVersion() >= 6) { @@ -1388,6 +1481,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return protocolVersion.get(); } + @Override public Response processControlCommand(ControlCommand command) throws Exception { String control = command.getCommand(); if (control != null && control.equals("shutdown")) { @@ -1396,10 +1490,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processMessageDispatch(MessageDispatch dispatch) throws Exception { return null; } + @Override public Response processConnectionControl(ConnectionControl control) throws Exception { if (control != null) { faultTolerantConnection = control.isFaultTolerant(); @@ -1407,10 +1503,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } + @Override public Response processConnectionError(ConnectionError error) throws Exception { return null; } + @Override public Response processConsumerControl(ConsumerControl control) throws Exception { ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId()); broker.processConsumerControl(consumerExchange, control); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 4d2b78efba..c9189df727 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import org.apache.activemq.broker.Broker; @@ -43,6 +42,7 @@ import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.Response; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.transaction.Synchronization; +import org.apache.activemq.transport.TransmitCallback; import org.apache.activemq.usage.SystemUsage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +88,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { /** * Allows a message to be pulled on demand by a client */ + @Override public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { // The slave should not deliver pull messages. // TODO: when the slave becomes a master, He should send a NULL message to all the @@ -143,6 +144,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } } + @Override public void add(MessageReference node) throws Exception { synchronized (pendingLock) { // The destination may have just been removed... @@ -160,6 +162,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { dispatchPending(); } + @Override public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { synchronized(pendingLock) { try { @@ -189,6 +192,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName()); } + @Override public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception { // Handle the standard acknowledgment case. boolean callDispatchMatched = false; @@ -305,7 +309,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { Destination nodeDest = (Destination) node.getRegionDestination(); if (node.isExpired()) { if (broker.isExpired(node)) { - Destination regionDestination = (Destination) nodeDest; + Destination regionDestination = nodeDest; regionDestination.messageExpired(context, this, node); } iter.remove(); @@ -500,6 +504,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { broker.getRoot().sendToDeadLetterQueue(context, node, this); } + @Override public int getInFlightSize() { return dispatched.size(); } @@ -509,6 +514,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { * * @return */ + @Override public boolean isFull() { return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize(); } @@ -516,6 +522,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { /** * @return true when 60% or more room is left for dispatching messages */ + @Override public boolean isLowWaterMark() { return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4); } @@ -523,6 +530,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { /** * @return true when 10% or less room is left for dispatching messages */ + @Override public boolean isHighWaterMark() { return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9); } @@ -532,22 +540,27 @@ public abstract class PrefetchSubscription extends AbstractSubscription { return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size(); } + @Override public int getPendingQueueSize() { return pending.size(); } + @Override public int getDispatchedQueueSize() { return dispatched.size(); } + @Override public long getDequeueCounter() { return dequeueCounter; } + @Override public long getDispatchedCounter() { return dispatchCounter; } + @Override public long getEnqueueCounter() { return enqueueCounter; } @@ -613,8 +626,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { setPendingBatchSize(pending, numberToDispatch); int count = 0; pending.reset(); - while (pending.hasNext() && !isFull() - && count < numberToDispatch) { + while (pending.hasNext() && !isFull() && count < numberToDispatch) { MessageReference node = pending.next(); if (node == null) { break; @@ -683,15 +695,29 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } } if (info.isDispatchAsync()) { - md.setTransmitCallback(new Runnable() { + md.setTransmitCallback(new TransmitCallback() { - public void run() { - // Since the message gets queued up in async dispatch, - // we don't want to - // decrease the reference count until it gets put on the - // wire. + @Override + public void onSuccess() { + // Since the message gets queued up in async dispatch, we don't want to + // decrease the reference count until it gets put on the wire. onDispatch(node, message); } + + @Override + public void onFailure() { + Destination nodeDest = (Destination) node.getRegionDestination(); + if (nodeDest != null) { + if (node != QueueMessageReference.NULL_MESSAGE) { + nodeDest.getDestinationStatistics().getDispatched().increment(); + nodeDest.getDestinationStatistics().getInflight().increment(); + if (LOG.isTraceEnabled()) { + LOG.trace(info.getConsumerId() + " failed to dispatch: " + message.getMessageId() + " - " + + message.getDestination() + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size()); + } + } + } + } }); context.getConnection().dispatchAsync(md); } else { @@ -728,6 +754,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { * * @param newPrefetch */ + @Override public void updateConsumerPrefetch(int newPrefetch) { if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { ConsumerControl cc = new ConsumerControl(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 90ed351d00..4474f2a234 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -41,6 +41,7 @@ import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.Response; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.transaction.Synchronization; +import org.apache.activemq.transport.TransmitCallback; import org.apache.activemq.usage.SystemUsage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,6 +97,7 @@ public class TopicSubscription extends AbstractSubscription { this.active=true; } + @Override public void add(MessageReference node) throws Exception { if (isDuplicate(node)) { return; @@ -236,6 +238,7 @@ public class TopicSubscription extends AbstractSubscription { } } + @Override public void processMessageDispatchNotification(MessageDispatchNotification mdn) { synchronized (matchedListMutex) { try { @@ -256,6 +259,7 @@ public class TopicSubscription extends AbstractSubscription { } } + @Override public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { // Handle the standard acknowledgment case. if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) { @@ -299,6 +303,7 @@ public class TopicSubscription extends AbstractSubscription { throw new JMSException("Invalid acknowledgment: " + ack); } + @Override public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { // The slave should not deliver pull messages. @@ -320,6 +325,7 @@ public class TopicSubscription extends AbstractSubscription { if (pull.getTimeout() > 0) { scheduler.executeAfterDelay(new Runnable() { + @Override public void run() { pullTimeout(); } @@ -346,10 +352,12 @@ public class TopicSubscription extends AbstractSubscription { } } + @Override public int getPendingQueueSize() { return matched(); } + @Override public int getDispatchedQueueSize() { return (int)(dispatchedCounter.get() - dequeueCounter.get()); } @@ -358,14 +366,17 @@ public class TopicSubscription extends AbstractSubscription { return maximumPendingMessages; } + @Override public long getDispatchedCounter() { return dispatchedCounter.get(); } + @Override public long getEnqueueCounter() { return enqueueCounter.get(); } + @Override public long getDequeueCounter() { return dequeueCounter.get(); } @@ -445,10 +456,12 @@ public class TopicSubscription extends AbstractSubscription { // Implementation methods // ------------------------------------------------------------------------- + @Override public boolean isFull() { return getDispatchedQueueSize() >= info.getPrefetchSize() && !prefetchWindowOpen.get(); } + @Override public int getInFlightSize() { return getDispatchedQueueSize(); } @@ -456,6 +469,7 @@ public class TopicSubscription extends AbstractSubscription { /** * @return true when 60% or more room is left for dispatching messages */ + @Override public boolean isLowWaterMark() { return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4); } @@ -463,6 +477,7 @@ public class TopicSubscription extends AbstractSubscription { /** * @return true when 10% or less room is left for dispatching messages */ + @Override public boolean isHighWaterMark() { return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9); } @@ -507,6 +522,7 @@ public class TopicSubscription extends AbstractSubscription { * * @param newPrefetch */ + @Override public void updateConsumerPrefetch(int newPrefetch) { if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { ConsumerControl cc = new ConsumerControl(); @@ -567,9 +583,18 @@ public class TopicSubscription extends AbstractSubscription { } if (info.isDispatchAsync()) { if (node != null) { - md.setTransmitCallback(new Runnable() { + md.setTransmitCallback(new TransmitCallback() { + @Override - public void run() { + public void onSuccess() { + Destination regionDestination = (Destination) node.getRegionDestination(); + regionDestination.getDestinationStatistics().getDispatched().increment(); + regionDestination.getDestinationStatistics().getInflight().increment(); + node.decrementReferenceCount(); + } + + @Override + public void onFailure() { Destination regionDestination = (Destination) node.getRegionDestination(); regionDestination.getDestinationStatistics().getDispatched().increment(); regionDestination.getDestinationStatistics().getInflight().increment(); @@ -612,6 +637,7 @@ public class TopicSubscription extends AbstractSubscription { + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded(); } + @Override public void destroy() { this.active=false; synchronized (matchedListMutex) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index 608b88123b..89e564737f 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; + import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; @@ -36,8 +37,6 @@ import org.slf4j.LoggerFactory; /** * persist pending messages pending message (messages awaiting dispatch to a * consumer) cursor - * - * */ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { @@ -50,6 +49,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { private PendingMessageCursor currentCursor; private final DurableTopicSubscription subscription; private boolean immediatePriorityDispatch = true; + /** * @param broker Broker for this cursor * @param clientId clientId for this cursor @@ -67,7 +67,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } else { this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages); } - + this.nonPersistent.setMaxBatchSize(maxBatchSize); this.nonPersistent.setSystemUsage(systemUsage); this.storePrefetches.add(this.nonPersistent); @@ -82,7 +82,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { if (!isStarted()) { super.start(); for (PendingMessageCursor tsp : storePrefetches) { - tsp.setMessageAudit(getMessageAudit()); + tsp.setMessageAudit(getMessageAudit()); tsp.start(); } } @@ -108,7 +108,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { /** * Add a destination - * + * * @param context * @param destination * @throws Exception @@ -134,7 +134,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { /** * remove a destination - * + * * @param context * @param destination * @throws Exception @@ -173,7 +173,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { /** * Informs the Broker if the subscription needs to intervention to recover * it's state e.g. DurableTopicSubscriber may do - * + * * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor * @return true if recovery required */ @@ -290,6 +290,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { @Override public synchronized void release() { + this.currentCursor = null; for (PendingMessageCursor storePrefetch : storePrefetches) { storePrefetch.release(); } @@ -326,7 +327,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { tsp.setSystemUsage(usageManager); } } - + @Override public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); @@ -334,7 +335,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { cursor.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); } } - + @Override public void setMaxProducersToAudit(int maxProducersToAudit) { super.setMaxProducersToAudit(maxProducersToAudit); @@ -350,7 +351,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { cursor.setMaxAuditDepth(maxAuditDepth); } } - + @Override public void setEnableAudit(boolean enableAudit) { super.setEnableAudit(enableAudit); @@ -358,7 +359,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { cursor.setEnableAudit(enableAudit); } } - + @Override public void setUseCache(boolean useCache) { super.setUseCache(useCache); @@ -366,7 +367,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { cursor.setUseCache(useCache); } } - + protected synchronized PendingMessageCursor getNextCursor() throws Exception { if (currentCursor == null || currentCursor.isEmpty()) { currentCursor = null; @@ -384,7 +385,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } return currentCursor; } - + @Override public String toString() { return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")"; diff --git a/activemq-client/src/main/java/org/apache/activemq/command/MessageDispatch.java b/activemq-client/src/main/java/org/apache/activemq/command/MessageDispatch.java index fe9b11a63a..da2c0bfcc5 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/MessageDispatch.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/MessageDispatch.java @@ -17,11 +17,12 @@ package org.apache.activemq.command; import org.apache.activemq.state.CommandVisitor; +import org.apache.activemq.transport.TransmitCallback; /** - * + * * @openwire:marshaller code="21" - * + * */ public class MessageDispatch extends BaseCommand { @@ -34,13 +35,15 @@ public class MessageDispatch extends BaseCommand { protected transient long deliverySequenceId; protected transient Object consumer; - protected transient Runnable transmitCallback; + protected transient TransmitCallback transmitCallback; protected transient Throwable rollbackCause; + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; } + @Override public boolean isMessageDispatch() { return true; } @@ -105,15 +108,16 @@ public class MessageDispatch extends BaseCommand { this.consumer = consumer; } + @Override public Response visit(CommandVisitor visitor) throws Exception { return visitor.processMessageDispatch(this); } - public Runnable getTransmitCallback() { + public TransmitCallback getTransmitCallback() { return transmitCallback; } - public void setTransmitCallback(Runnable transmitCallback) { + public void setTransmitCallback(TransmitCallback transmitCallback) { this.transmitCallback = transmitCallback; } diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransmitCallback.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransmitCallback.java new file mode 100644 index 0000000000..f2ce5f6b8a --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransmitCallback.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport; + +public interface TransmitCallback { + + void onSuccess(); + + void onFailure(); + +}