Added now option to connection and consumer to allow for configuration of a time interval upon which all the outstanding acks are delivered when optimized acknowledge is used so that a long running consumer that doesn't receive any more messages will eventually ack the last few unacked messages.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1387079 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-09-18 10:12:30 +00:00
parent 183ae75048
commit 3d5a758666
5 changed files with 189 additions and 52 deletions

View File

@ -146,6 +146,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private boolean useAsyncSend;
private boolean optimizeAcknowledge;
private long optimizeAcknowledgeTimeOut = 0;
private long optimizedAckScheduledAckInterval = 0;
private boolean nestedMapAndListEnabled = true;
private boolean useRetroactiveConsumer;
private boolean exclusiveConsumer;
@ -484,8 +485,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
*
* @return the listener or <code>null</code> if no listener is registered with the connection.
*/
public ClientInternalExceptionListener getClientInternalExceptionListener()
{
public ClientInternalExceptionListener getClientInternalExceptionListener() {
return clientInternalExceptionListener;
}
@ -498,8 +498,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
*
* @param listener the exception listener
*/
public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
{
public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) {
this.clientInternalExceptionListener = listener;
}
@ -1775,7 +1774,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.sendAcksAsync = sendAcksAsync;
}
/**
* Returns the time this connection was created
*/
@ -1901,8 +1899,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} catch (Exception e) {
onClientInternalException(e);
}
}
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.onCommand(command);
@ -1937,6 +1935,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
}
}
/**
* Used for handling async exceptions
*
@ -1976,8 +1975,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} catch (JMSException e) {
LOG.warn("Exception during connection cleanup, " + e, e);
}
for (Iterator<TransportListener> iter = transportListeners
.iterator(); iter.hasNext();) {
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.onException(error);
}
@ -2051,9 +2049,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
checkClosedOrFailed();
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
if (s.isInUse(destination)) {
for (ActiveMQSession session : this.sessions) {
if (session.isInUse(destination)) {
throw new JMSException("A consumer is consuming from the temporary destination");
}
}
@ -2109,7 +2106,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
info.setDestination(destination);
info.setTimeout(0);
syncSendPacket(info);
}
public boolean isDispatchAsync() {
@ -2160,8 +2156,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
return createInputStream(dest, messageSelector, noLocal, -1);
}
public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
}
@ -2276,12 +2270,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
msg.setJMSExpiration(expiration);
msg.setJMSPriority(priority);
msg.setJMSRedelivered(false);
msg.setMessageId(messageId);
msg.onSend();
msg.setProducerId(msg.getMessageId().getProducerId());
if (LOG.isDebugEnabled()) {
@ -2293,7 +2284,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} else {
syncSendPacket(msg);
}
}
public void addOutputStream(ActiveMQOutputStream stream) {
@ -2319,13 +2309,15 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
LOG.info("JVM told to shutdown");
System.exit(0);
}
if (false && "close".equals(text)){
LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
try {
close();
} catch (JMSException e) {
}
}
// TODO Should we handle the "close" case?
// if (false && "close".equals(text)){
// LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
// try {
// close();
// } catch (JMSException e) {
// }
// }
}
}
@ -2341,14 +2333,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protected void onConsumerControl(ConsumerControl command) {
if (command.isClose()) {
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.close(command.getConsumerId());
for (ActiveMQSession session : this.sessions) {
session.close(command.getConsumerId());
}
} else {
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
for (ActiveMQSession session : this.sessions) {
session.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
}
}
}
@ -2517,7 +2507,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.checkForDuplicates = checkForDuplicates;
}
public boolean isTransactedIndividualAck() {
return transactedIndividualAck;
}
@ -2607,4 +2596,25 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
this.rejectedTaskHandler = rejectedTaskHandler;
}
/**
* Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
* to send an ack for any outstanding Message Acks. By default this value is set to zero meaning that the consumers
* will not do any background Message acknowledgment.
*
* @return the scheduledOptimizedAckInterval
*/
public long getOptimizedAckScheduledAckInterval() {
return optimizedAckScheduledAckInterval;
}
/**
* Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
* have been configured with optimizeAcknowledge enabled.
*
* @param scheduledOptimizedAckInterval the scheduledOptimizedAckInterval to set
*/
public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
}
}

View File

@ -92,6 +92,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private boolean disableTimeStampsByDefault;
private boolean optimizedMessageDispatch = true;
private long optimizeAcknowledgeTimeOut = 300;
private long optimizedAckScheduledAckInterval = 0;
private boolean copyMessageOnSend = true;
private boolean useCompression;
private boolean objectMessageSerializationDefered;
@ -312,6 +313,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval());
connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
connection.setExclusiveConsumer(isExclusiveConsumer());
connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap());
@ -1117,4 +1119,25 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
this.rejectedTaskHandler = rejectedTaskHandler;
}
/**
* Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
* to send an ack for any outstanding Message Acks. By default this value is set to zero meaning that the consumers
* will not do any background Message acknowledgment.
*
* @return the scheduledOptimizedAckInterval
*/
public long getOptimizedAckScheduledAckInterval() {
return optimizedAckScheduledAckInterval;
}
/**
* Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
* have been configured with optimizeAcknowledge enabled.
*
* @param scheduledOptimizedAckInterval the scheduledOptimizedAckInterval to set
*/
public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
}
}

View File

@ -152,6 +152,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private long optimizeAckTimestamp = System.currentTimeMillis();
private long optimizeAcknowledgeTimeOut = 0;
private long optimizedAckScheduledAckInterval = 0;
private Runnable optimizedAckTask;
private long failoverRedeliveryWaitPeriod = 0;
private boolean transactedIndividualAck = false;
private boolean nonBlockingRedelivery = false;
@ -189,13 +191,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
if (physicalName.indexOf(connectionID) < 0) {
throw new InvalidDestinationException(
"Cannot use a Temporary destination from another Connection");
throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
}
if (session.connection.isDeleted(dest)) {
throw new InvalidDestinationException(
"Cannot use a Temporary destination that has been deleted");
throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
}
if (prefetch < 0) {
throw new JMSException("Cannot have a prefetch size less than zero");
@ -258,7 +258,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
&& !info.isBrowser();
if (this.optimizeAcknowledge) {
this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval());
}
this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
@ -415,8 +417,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void setMessageListener(MessageListener listener) throws JMSException {
checkClosed();
if (info.getPrefetchSize() == 0) {
throw new JMSException(
"Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
}
if (listener != null) {
boolean wasRunning = session.isRunning();
@ -551,7 +552,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
session.acknowledge();
}
});
}else if (session.isIndividualAcknowledge()) {
} else if (session.isIndividualAcknowledge()) {
m.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception {
session.checkClosed();
@ -683,7 +684,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
this.session.asyncSendPacket(removeCommand);
if (interrupted) {
Thread.currentThread().interrupt();
} }
}
}
void inProgressClearRequired() {
inProgressClearRequiredFlag.incrementAndGet();
@ -772,6 +774,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
ThreadPoolUtils.shutdownGraceful(executorService, 60000L);
executorService = null;
}
if (optimizedAckTask != null) {
this.session.connection.getScheduler().cancel(optimizedAckTask);
optimizedAckTask = null;
}
if (session.isClientAcknowledge()) {
if (!this.info.isBrowser()) {
@ -888,8 +894,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (!deliveredMessages.isEmpty()) {
if (optimizeAcknowledge) {
ackCounter++;
// AMQ-3956 evaluate both expired and normal msgs as
// AMQ-3956 evaluate both expired and normal msgs as
// otherwise consumer may get stalled
if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
@ -899,16 +905,16 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
session.sendAck(ack);
optimizeAckTimestamp = System.currentTimeMillis();
}
// AMQ-3956 - as further optimization send
// AMQ-3956 - as further optimization send
// ack for expired msgs when there are any.
// This resets the deliveredCounter to 0 so that
// we won't sent standard acks with every msg just
// because the deliveredCounter just below
// because the deliveredCounter just below
// 0.5 * prefetch as used in ackLater()
if (pendingAck != null && deliveredCounter > 0) {
session.sendAck(pendingAck);
pendingAck = null;
deliveredCounter = 0;
session.sendAck(pendingAck);
pendingAck = null;
deliveredCounter = 0;
}
}
} else {
@ -989,7 +995,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
}
// AMQ-3956 evaluate both expired and normal msgs as
// AMQ-3956 evaluate both expired and normal msgs as
// otherwise consumer may get stalled
if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) {
session.sendAck(pendingAck);
@ -1471,4 +1477,55 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void setFailureError(IOException failureError) {
this.failureError = failureError;
}
/**
* @return the optimizedAckScheduledAckInterval
*/
public long getOptimizedAckScheduledAckInterval() {
return optimizedAckScheduledAckInterval;
}
/**
* @param optimizedAckScheduledAckInterval the optimizedAckScheduledAckInterval to set
*/
public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) throws JMSException {
this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
if (this.optimizedAckTask != null) {
try {
this.session.connection.getScheduler().cancel(optimizedAckTask);
} catch (JMSException e) {
LOG.debug("Caught exception while cancelling old optimized ack task", e);
throw e;
}
this.optimizedAckTask = null;
}
// Should we periodically send out all outstanding acks.
if (this.optimizeAcknowledge && this.optimizedAckScheduledAckInterval > 0) {
this.optimizedAckTask = new Runnable() {
@Override
public void run() {
try {
if (optimizeAcknowledge && !unconsumedMessages.isClosed()) {
if (LOG.isInfoEnabled()) {
LOG.info("Consumer:{} is performing scheduled delivery of outstanding optimized Acks", info.getConsumerId());
}
deliverAcks();
}
} catch (Exception e) {
LOG.debug("Optimized Ack Task caught exception during ack", e);
}
}
};
try {
this.session.connection.getScheduler().executePeriodically(optimizedAckTask, optimizedAckScheduledAckInterval);
} catch (JMSException e) {
LOG.debug("Caught exception while scheduling new optimized ack task", e);
throw e;
}
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.thread;
import java.util.HashMap;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
@ -54,7 +55,7 @@ public final class Scheduler extends ServiceSupport {
TimerTask ticket = timerTasks.remove(task);
if (ticket != null) {
ticket.cancel();
timer.purge();//remove cancelled TimerTasks
timer.purge(); // remove cancelled TimerTasks
}
}
@ -70,7 +71,6 @@ public final class Scheduler extends ServiceSupport {
@Override
protected synchronized void doStart() throws Exception {
this.timer = new Timer(name, true);
}
@Override
@ -78,7 +78,6 @@ public final class Scheduler extends ServiceSupport {
if (this.timer != null) {
this.timer.cancel();
}
}
public String getName() {

View File

@ -16,10 +16,13 @@
*/
package org.apache.activemq;
import java.util.concurrent.TimeUnit;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.util.Wait;
@ -82,7 +85,6 @@ public class OptimizedAckTest extends TestSupport {
}
}
public void testVerySlowReceivedMessageStillInflight() throws Exception {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -119,4 +121,50 @@ public class OptimizedAckTest extends TestSupport {
}
}
}
public void testReceivedMessageNotInFlightAfterScheduledAckFires() throws Exception {
connection.setOptimizedAckScheduledAckInterval(TimeUnit.SECONDS.toMillis(10));
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test");
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("Hello" + i));
}
final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
MessageConsumer consumer = session.createConsumer(queue);
assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
}
}));
for (int i=0; i<10; i++) {
javax.jms.Message msg = consumer.receive(4000);
assertNotNull(msg);
if (i<7) {
assertEquals("all prefetch is still in flight", 10, regionBroker.getDestinationStatistics().getInflight().getCount());
} else {
assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition(){
@Override
public boolean isSatisified() throws Exception {
LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
}
}));
}
}
assertTrue("After delay the scheduled ack should ack all inflight.", Wait.waitFor(new Wait.Condition(){
@Override
public boolean isSatisified() throws Exception {
LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
return 0 == regionBroker.getDestinationStatistics().getInflight().getCount();
}
}));
}
}