Add expanded transmit callback interface so that a failure to transmit can be distinguished from normal operation and allow for no further attempts at dispatch fixing the current NPE when async dispatch is enabled. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1432487 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-12 18:13:27 +00:00
parent 2a5d0821b1
commit ae61847d02
6 changed files with 224 additions and 43 deletions

View File

@ -36,10 +36,44 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.transaction.xa.XAResource;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.*;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ControlCommand;
import org.apache.activemq.command.DataArrayResponse;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.FlushCommand;
import org.apache.activemq.command.IntegerResponse;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.MBeanNetworkListener;
import org.apache.activemq.network.NetworkBridgeConfiguration;
@ -57,12 +91,12 @@ import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transaction.Transaction;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -174,6 +208,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
*
* @return size of dispatch queue
*/
@Override
public int getDispatchQueueSize() {
synchronized (dispatchQueue) {
return dispatchQueue.size();
@ -207,8 +242,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
private boolean expected(IOException e) {
return isStomp() &&
((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
}
private boolean isStomp() {
@ -221,6 +255,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
* service exception closes a socket, we should not tie up broker threads
* since client sockets may hang or cause deadlocks.
*/
@Override
public void serviceExceptionAsync(final IOException e) {
if (asyncException.compareAndSet(false, true)) {
new Thread("Async Exception Handler") {
@ -237,6 +272,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
* if: the client is closing or broker is closing. Otherwise, the connection
* error transmitted to the client before stopping it's transport.
*/
@Override
public void serviceException(Throwable e) {
// are we a transport exception such as not being able to dispatch
// synchronously to a transport
@ -282,6 +318,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
}
@Override
public Response service(Command command) {
MDC.put("activemq.connector", connector.getUri().toString());
Response response = null;
@ -324,30 +361,36 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return response;
}
@Override
public Response processKeepAlive(KeepAliveInfo info) throws Exception {
return null;
}
@Override
public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
return null;
}
@Override
public Response processWireFormat(WireFormatInfo info) throws Exception {
wireFormatInfo = info;
protocolVersion.set(info.getVersion());
return null;
}
@Override
public Response processShutdown(ShutdownInfo info) throws Exception {
stopAsync();
return null;
}
@Override
public Response processFlush(FlushCommand command) throws Exception {
return null;
}
@Override
public Response processBeginTransaction(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = null;
@ -365,6 +408,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processEndTransaction(TransactionInfo info) throws Exception {
// No need to do anything. This packet is just sent by the client
// make sure he is synced with the server as commit command could
@ -372,6 +416,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processPrepareTransaction(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = null;
@ -403,6 +448,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
}
@Override
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = cs.getContext();
@ -411,6 +457,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = cs.getContext();
@ -419,6 +466,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processRollbackTransaction(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = cs.getContext();
@ -427,6 +475,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processForgetTransaction(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = cs.getContext();
@ -434,6 +483,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processRecoverTransactions(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = cs.getContext();
@ -441,6 +491,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return new DataArrayResponse(preparedTransactions);
}
@Override
public Response processMessage(Message messageSend) throws Exception {
ProducerId producerId = messageSend.getProducerId();
ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
@ -450,6 +501,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processMessageAck(MessageAck ack) throws Exception {
ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
if (consumerExchange != null) {
@ -458,15 +510,18 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processMessagePull(MessagePull pull) throws Exception {
return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
}
@Override
public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
broker.processDispatchNotification(notification);
return null;
}
@Override
public Response processAddDestination(DestinationInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
broker.addDestinationInfo(cs.getContext(), info);
@ -476,6 +531,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processRemoveDestination(DestinationInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
broker.removeDestinationInfo(cs.getContext(), info);
@ -485,6 +541,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processAddProducer(ProducerInfo info) throws Exception {
SessionId sessionId = info.getProducerId().getParentId();
ConnectionId connectionId = sessionId.getParentId();
@ -517,6 +574,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processRemoveProducer(ProducerId id) throws Exception {
SessionId sessionId = id.getParentId();
ConnectionId connectionId = sessionId.getParentId();
@ -535,6 +593,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processAddConsumer(ConsumerInfo info) throws Exception {
SessionId sessionId = info.getConsumerId().getParentId();
ConnectionId connectionId = sessionId.getParentId();
@ -569,6 +628,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
SessionId sessionId = id.getParentId();
ConnectionId connectionId = sessionId.getParentId();
@ -593,6 +653,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processAddSession(SessionInfo info) throws Exception {
ConnectionId connectionId = info.getSessionId().getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
@ -609,6 +670,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
ConnectionId connectionId = id.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
@ -642,6 +704,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processAddConnection(ConnectionInfo info) throws Exception {
// Older clients should have been defaulting this field to true.. but
// they were not.
@ -728,6 +791,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
throws InterruptedException {
LOG.debug("remove connection id: " + id);
@ -776,15 +840,18 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processProducerAck(ProducerAck ack) throws Exception {
// A broker should not get ProducerAck messages.
return null;
}
@Override
public Connector getConnector() {
return connector;
}
@Override
public void dispatchSync(Command message) {
try {
processDispatch(message);
@ -793,6 +860,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
}
@Override
public void dispatchAsync(Command message) {
if (!stopping.get()) {
if (taskRunner == null) {
@ -810,17 +878,17 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} else {
if (message.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) message;
Runnable sub = md.getTransmitCallback();
TransmitCallback sub = md.getTransmitCallback();
broker.postProcessDispatch(md);
if (sub != null) {
sub.run();
sub.onFailure();
}
}
}
}
protected void processDispatch(Command command) throws IOException {
final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
try {
if (!stopping.get()) {
if (messageDispatch != null) {
@ -828,17 +896,27 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
dispatch(command);
}
} finally {
} catch (Throwable e) {
if (messageDispatch != null) {
Runnable sub = messageDispatch.getTransmitCallback();
TransmitCallback sub = messageDispatch.getTransmitCallback();
broker.postProcessDispatch(messageDispatch);
if (sub != null) {
sub.run();
sub.onFailure();
}
messageDispatch = null;
}
} finally {
if (messageDispatch != null) {
TransmitCallback sub = messageDispatch.getTransmitCallback();
broker.postProcessDispatch(messageDispatch);
if (sub != null) {
sub.onSuccess();
}
}
}
}
@Override
public boolean iterate() {
try {
if (pendingStop || stopping.get()) {
@ -877,6 +955,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
/**
* Returns the statistics for this connection
*/
@Override
public ConnectionStatistics getStatistics() {
return statistics;
}
@ -889,10 +968,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
this.messageAuthorizationPolicy = messageAuthorizationPolicy;
}
@Override
public boolean isManageable() {
return manageable;
}
@Override
public void start() throws Exception {
try {
synchronized (this) {
@ -931,6 +1012,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
}
@Override
public void stop() throws Exception {
// do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory)
// as their lifecycle is handled elsewhere
@ -949,6 +1031,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
try {
stopTaskRunnerFactory.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(waitTime);
@ -985,6 +1068,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
try {
stopTaskRunnerFactory.execute(new Runnable() {
@Override
public void run() {
serviceLock.writeLock().lock();
try {
@ -1039,10 +1123,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
Command command = iter.next();
if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) command;
Runnable sub = md.getTransmitCallback();
TransmitCallback sub = md.getTransmitCallback();
broker.postProcessDispatch(md);
if (sub != null) {
sub.run();
sub.onFailure();
}
}
}
@ -1109,6 +1193,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
/**
* @return true if the Connection is slow
*/
@Override
public boolean isSlow() {
return slow;
}
@ -1132,6 +1217,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
/**
* @return if after being marked, the Connection is still writing
*/
@Override
public boolean isBlocked() {
return blocked;
}
@ -1139,6 +1225,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
/**
* @return true if the Connection is connected
*/
@Override
public boolean isConnected() {
return connected;
}
@ -1160,6 +1247,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
/**
* @return true if the Connection is active
*/
@Override
public boolean isActive() {
return active;
}
@ -1178,10 +1266,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return starting;
}
@Override
public synchronized boolean isNetworkConnection() {
return networkConnection;
}
@Override
public boolean isFaultTolerantConnection() {
return this.faultTolerantConnection;
}
@ -1201,9 +1291,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
this.pendingStop = pendingStop;
}
@Override
public Response processBrokerInfo(BrokerInfo info) {
if (info.isSlaveBroker()) {
BrokerService bService = connector.getBrokerService();
LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: " + info.getBrokerName());
} else if (info.isNetworkConnection() && info.isDuplexConnection()) {
// so this TransportConnection is the rear end of a network bridge
@ -1291,10 +1381,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
}
@Override
public String getRemoteAddress() {
return transport.getRemoteAddress();
}
@Override
public String getConnectionId() {
List<TransportConnectionState> connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
@ -1306,6 +1398,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public void updateClient(ConnectionControl control) {
if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
&& this.wireFormatInfo.getVersion() >= 6) {
@ -1388,6 +1481,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return protocolVersion.get();
}
@Override
public Response processControlCommand(ControlCommand command) throws Exception {
String control = command.getCommand();
if (control != null && control.equals("shutdown")) {
@ -1396,10 +1490,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
return null;
}
@Override
public Response processConnectionControl(ConnectionControl control) throws Exception {
if (control != null) {
faultTolerantConnection = control.isFaultTolerant();
@ -1407,10 +1503,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return null;
}
@Override
public Response processConnectionError(ConnectionError error) throws Exception {
return null;
}
@Override
public Response processConsumerControl(ConsumerControl control) throws Exception {
ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
broker.processConsumerControl(consumerExchange, control);

View File

@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
@ -43,6 +42,7 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -88,6 +88,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
/**
* Allows a message to be pulled on demand by a client
*/
@Override
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
// The slave should not deliver pull messages.
// TODO: when the slave becomes a master, He should send a NULL message to all the
@ -143,6 +144,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
}
@Override
public void add(MessageReference node) throws Exception {
synchronized (pendingLock) {
// The destination may have just been removed...
@ -160,6 +162,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
dispatchPending();
}
@Override
public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
synchronized(pendingLock) {
try {
@ -189,6 +192,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
+ mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
}
@Override
public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
// Handle the standard acknowledgment case.
boolean callDispatchMatched = false;
@ -305,7 +309,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
Destination nodeDest = (Destination) node.getRegionDestination();
if (node.isExpired()) {
if (broker.isExpired(node)) {
Destination regionDestination = (Destination) nodeDest;
Destination regionDestination = nodeDest;
regionDestination.messageExpired(context, this, node);
}
iter.remove();
@ -500,6 +504,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
broker.getRoot().sendToDeadLetterQueue(context, node, this);
}
@Override
public int getInFlightSize() {
return dispatched.size();
}
@ -509,6 +514,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
*
* @return
*/
@Override
public boolean isFull() {
return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
}
@ -516,6 +522,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
/**
* @return true when 60% or more room is left for dispatching messages
*/
@Override
public boolean isLowWaterMark() {
return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
}
@ -523,6 +530,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
/**
* @return true when 10% or less room is left for dispatching messages
*/
@Override
public boolean isHighWaterMark() {
return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
}
@ -532,22 +540,27 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
}
@Override
public int getPendingQueueSize() {
return pending.size();
}
@Override
public int getDispatchedQueueSize() {
return dispatched.size();
}
@Override
public long getDequeueCounter() {
return dequeueCounter;
}
@Override
public long getDispatchedCounter() {
return dispatchCounter;
}
@Override
public long getEnqueueCounter() {
return enqueueCounter;
}
@ -613,8 +626,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
setPendingBatchSize(pending, numberToDispatch);
int count = 0;
pending.reset();
while (pending.hasNext() && !isFull()
&& count < numberToDispatch) {
while (pending.hasNext() && !isFull() && count < numberToDispatch) {
MessageReference node = pending.next();
if (node == null) {
break;
@ -683,15 +695,29 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
}
if (info.isDispatchAsync()) {
md.setTransmitCallback(new Runnable() {
md.setTransmitCallback(new TransmitCallback() {
public void run() {
// Since the message gets queued up in async dispatch,
// we don't want to
// decrease the reference count until it gets put on the
// wire.
@Override
public void onSuccess() {
// Since the message gets queued up in async dispatch, we don't want to
// decrease the reference count until it gets put on the wire.
onDispatch(node, message);
}
@Override
public void onFailure() {
Destination nodeDest = (Destination) node.getRegionDestination();
if (nodeDest != null) {
if (node != QueueMessageReference.NULL_MESSAGE) {
nodeDest.getDestinationStatistics().getDispatched().increment();
nodeDest.getDestinationStatistics().getInflight().increment();
if (LOG.isTraceEnabled()) {
LOG.trace(info.getConsumerId() + " failed to dispatch: " + message.getMessageId() + " - "
+ message.getDestination() + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size());
}
}
}
}
});
context.getConnection().dispatchAsync(md);
} else {
@ -728,6 +754,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
*
* @param newPrefetch
*/
@Override
public void updateConsumerPrefetch(int newPrefetch) {
if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
ConsumerControl cc = new ConsumerControl();

View File

@ -41,6 +41,7 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -96,6 +97,7 @@ public class TopicSubscription extends AbstractSubscription {
this.active=true;
}
@Override
public void add(MessageReference node) throws Exception {
if (isDuplicate(node)) {
return;
@ -236,6 +238,7 @@ public class TopicSubscription extends AbstractSubscription {
}
}
@Override
public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
synchronized (matchedListMutex) {
try {
@ -256,6 +259,7 @@ public class TopicSubscription extends AbstractSubscription {
}
}
@Override
public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
// Handle the standard acknowledgment case.
if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
@ -299,6 +303,7 @@ public class TopicSubscription extends AbstractSubscription {
throw new JMSException("Invalid acknowledgment: " + ack);
}
@Override
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
// The slave should not deliver pull messages.
@ -320,6 +325,7 @@ public class TopicSubscription extends AbstractSubscription {
if (pull.getTimeout() > 0) {
scheduler.executeAfterDelay(new Runnable() {
@Override
public void run() {
pullTimeout();
}
@ -346,10 +352,12 @@ public class TopicSubscription extends AbstractSubscription {
}
}
@Override
public int getPendingQueueSize() {
return matched();
}
@Override
public int getDispatchedQueueSize() {
return (int)(dispatchedCounter.get() - dequeueCounter.get());
}
@ -358,14 +366,17 @@ public class TopicSubscription extends AbstractSubscription {
return maximumPendingMessages;
}
@Override
public long getDispatchedCounter() {
return dispatchedCounter.get();
}
@Override
public long getEnqueueCounter() {
return enqueueCounter.get();
}
@Override
public long getDequeueCounter() {
return dequeueCounter.get();
}
@ -445,10 +456,12 @@ public class TopicSubscription extends AbstractSubscription {
// Implementation methods
// -------------------------------------------------------------------------
@Override
public boolean isFull() {
return getDispatchedQueueSize() >= info.getPrefetchSize() && !prefetchWindowOpen.get();
}
@Override
public int getInFlightSize() {
return getDispatchedQueueSize();
}
@ -456,6 +469,7 @@ public class TopicSubscription extends AbstractSubscription {
/**
* @return true when 60% or more room is left for dispatching messages
*/
@Override
public boolean isLowWaterMark() {
return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
}
@ -463,6 +477,7 @@ public class TopicSubscription extends AbstractSubscription {
/**
* @return true when 10% or less room is left for dispatching messages
*/
@Override
public boolean isHighWaterMark() {
return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
}
@ -507,6 +522,7 @@ public class TopicSubscription extends AbstractSubscription {
*
* @param newPrefetch
*/
@Override
public void updateConsumerPrefetch(int newPrefetch) {
if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
ConsumerControl cc = new ConsumerControl();
@ -567,9 +583,18 @@ public class TopicSubscription extends AbstractSubscription {
}
if (info.isDispatchAsync()) {
if (node != null) {
md.setTransmitCallback(new Runnable() {
md.setTransmitCallback(new TransmitCallback() {
@Override
public void run() {
public void onSuccess() {
Destination regionDestination = (Destination) node.getRegionDestination();
regionDestination.getDestinationStatistics().getDispatched().increment();
regionDestination.getDestinationStatistics().getInflight().increment();
node.decrementReferenceCount();
}
@Override
public void onFailure() {
Destination regionDestination = (Destination) node.getRegionDestination();
regionDestination.getDestinationStatistics().getDispatched().increment();
regionDestination.getDestinationStatistics().getInflight().increment();
@ -612,6 +637,7 @@ public class TopicSubscription extends AbstractSubscription {
+ getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
}
@Override
public void destroy() {
this.active=false;
synchronized (matchedListMutex) {

View File

@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@ -36,8 +37,6 @@ import org.slf4j.LoggerFactory;
/**
* persist pending messages pending message (messages awaiting dispatch to a
* consumer) cursor
*
*
*/
public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
@ -50,6 +49,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
private PendingMessageCursor currentCursor;
private final DurableTopicSubscription subscription;
private boolean immediatePriorityDispatch = true;
/**
* @param broker Broker for this cursor
* @param clientId clientId for this cursor
@ -290,6 +290,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
@Override
public synchronized void release() {
this.currentCursor = null;
for (PendingMessageCursor storePrefetch : storePrefetches) {
storePrefetch.release();
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.command;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.transport.TransmitCallback;
/**
*
@ -34,13 +35,15 @@ public class MessageDispatch extends BaseCommand {
protected transient long deliverySequenceId;
protected transient Object consumer;
protected transient Runnable transmitCallback;
protected transient TransmitCallback transmitCallback;
protected transient Throwable rollbackCause;
@Override
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
@Override
public boolean isMessageDispatch() {
return true;
}
@ -105,15 +108,16 @@ public class MessageDispatch extends BaseCommand {
this.consumer = consumer;
}
@Override
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processMessageDispatch(this);
}
public Runnable getTransmitCallback() {
public TransmitCallback getTransmitCallback() {
return transmitCallback;
}
public void setTransmitCallback(Runnable transmitCallback) {
public void setTransmitCallback(TransmitCallback transmitCallback) {
this.transmitCallback = transmitCallback;
}

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport;
public interface TransmitCallback {
void onSuccess();
void onFailure();
}