diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
index eaa8d07265..66b1560563 100755
--- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -146,6 +146,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private boolean useAsyncSend;
private boolean optimizeAcknowledge;
private long optimizeAcknowledgeTimeOut = 0;
+ private long optimizedAckScheduledAckInterval = 0;
private boolean nestedMapAndListEnabled = true;
private boolean useRetroactiveConsumer;
private boolean exclusiveConsumer;
@@ -484,8 +485,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
*
* @return the listener or null
if no listener is registered with the connection.
*/
- public ClientInternalExceptionListener getClientInternalExceptionListener()
- {
+ public ClientInternalExceptionListener getClientInternalExceptionListener() {
return clientInternalExceptionListener;
}
@@ -498,8 +498,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
*
* @param listener the exception listener
*/
- public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
- {
+ public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) {
this.clientInternalExceptionListener = listener;
}
@@ -1775,7 +1774,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.sendAcksAsync = sendAcksAsync;
}
-
/**
* Returns the time this connection was created
*/
@@ -1901,8 +1899,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} catch (Exception e) {
onClientInternalException(e);
}
-
}
+
for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.onCommand(command);
@@ -1937,6 +1935,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
}
}
+
/**
* Used for handling async exceptions
*
@@ -1976,8 +1975,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} catch (JMSException e) {
LOG.warn("Exception during connection cleanup, " + e, e);
}
- for (Iterator iter = transportListeners
- .iterator(); iter.hasNext();) {
+ for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.onException(error);
}
@@ -2051,9 +2049,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
checkClosedOrFailed();
- for (Iterator i = this.sessions.iterator(); i.hasNext();) {
- ActiveMQSession s = i.next();
- if (s.isInUse(destination)) {
+ for (ActiveMQSession session : this.sessions) {
+ if (session.isInUse(destination)) {
throw new JMSException("A consumer is consuming from the temporary destination");
}
}
@@ -2109,7 +2106,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
info.setDestination(destination);
info.setTimeout(0);
syncSendPacket(info);
-
}
public boolean isDispatchAsync() {
@@ -2160,8 +2156,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
return createInputStream(dest, messageSelector, noLocal, -1);
}
-
-
public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
}
@@ -2276,12 +2270,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
msg.setJMSExpiration(expiration);
msg.setJMSPriority(priority);
-
msg.setJMSRedelivered(false);
msg.setMessageId(messageId);
-
msg.onSend();
-
msg.setProducerId(msg.getMessageId().getProducerId());
if (LOG.isDebugEnabled()) {
@@ -2293,7 +2284,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} else {
syncSendPacket(msg);
}
-
}
public void addOutputStream(ActiveMQOutputStream stream) {
@@ -2319,13 +2309,15 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
LOG.info("JVM told to shutdown");
System.exit(0);
}
- if (false && "close".equals(text)){
- LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
- try {
- close();
- } catch (JMSException e) {
- }
- }
+
+ // TODO Should we handle the "close" case?
+ // if (false && "close".equals(text)){
+ // LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
+ // try {
+ // close();
+ // } catch (JMSException e) {
+ // }
+ // }
}
}
@@ -2341,14 +2333,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protected void onConsumerControl(ConsumerControl command) {
if (command.isClose()) {
- for (Iterator i = this.sessions.iterator(); i.hasNext();) {
- ActiveMQSession s = i.next();
- s.close(command.getConsumerId());
+ for (ActiveMQSession session : this.sessions) {
+ session.close(command.getConsumerId());
}
} else {
- for (Iterator i = this.sessions.iterator(); i.hasNext();) {
- ActiveMQSession s = i.next();
- s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
+ for (ActiveMQSession session : this.sessions) {
+ session.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
}
}
}
@@ -2517,7 +2507,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.checkForDuplicates = checkForDuplicates;
}
-
public boolean isTransactedIndividualAck() {
return transactedIndividualAck;
}
@@ -2607,4 +2596,25 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
this.rejectedTaskHandler = rejectedTaskHandler;
}
+
+ /**
+ * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
+ * to send an ack for any outstanding Message Acks. By default this value is set to zero meaning that the consumers
+ * will not do any background Message acknowledgment.
+ *
+ * @return the scheduledOptimizedAckInterval
+ */
+ public long getOptimizedAckScheduledAckInterval() {
+ return optimizedAckScheduledAckInterval;
+ }
+
+ /**
+ * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
+ * have been configured with optimizeAcknowledge enabled.
+ *
+ * @param scheduledOptimizedAckInterval the scheduledOptimizedAckInterval to set
+ */
+ public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
+ this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
+ }
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
index dd014f25aa..343a67f0b9 100755
--- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
+++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -92,6 +92,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private boolean disableTimeStampsByDefault;
private boolean optimizedMessageDispatch = true;
private long optimizeAcknowledgeTimeOut = 300;
+ private long optimizedAckScheduledAckInterval = 0;
private boolean copyMessageOnSend = true;
private boolean useCompression;
private boolean objectMessageSerializationDefered;
@@ -312,6 +313,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
+ connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval());
connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
connection.setExclusiveConsumer(isExclusiveConsumer());
connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap());
@@ -1117,4 +1119,25 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
this.rejectedTaskHandler = rejectedTaskHandler;
}
+
+ /**
+ * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
+ * to send an ack for any outstanding Message Acks. By default this value is set to zero meaning that the consumers
+ * will not do any background Message acknowledgment.
+ *
+ * @return the scheduledOptimizedAckInterval
+ */
+ public long getOptimizedAckScheduledAckInterval() {
+ return optimizedAckScheduledAckInterval;
+ }
+
+ /**
+ * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
+ * have been configured with optimizeAcknowledge enabled.
+ *
+ * @param scheduledOptimizedAckInterval the scheduledOptimizedAckInterval to set
+ */
+ public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
+ this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
+ }
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index 46b19aa4a6..af91da43de 100755
--- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -152,6 +152,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private long optimizeAckTimestamp = System.currentTimeMillis();
private long optimizeAcknowledgeTimeOut = 0;
+ private long optimizedAckScheduledAckInterval = 0;
+ private Runnable optimizedAckTask;
private long failoverRedeliveryWaitPeriod = 0;
private boolean transactedIndividualAck = false;
private boolean nonBlockingRedelivery = false;
@@ -189,13 +191,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
if (physicalName.indexOf(connectionID) < 0) {
- throw new InvalidDestinationException(
- "Cannot use a Temporary destination from another Connection");
+ throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
}
if (session.connection.isDeleted(dest)) {
- throw new InvalidDestinationException(
- "Cannot use a Temporary destination that has been deleted");
+ throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
}
if (prefetch < 0) {
throw new JMSException("Cannot have a prefetch size less than zero");
@@ -258,7 +258,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
&& !info.isBrowser();
if (this.optimizeAcknowledge) {
this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
+ setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval());
}
+
this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
@@ -415,8 +417,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void setMessageListener(MessageListener listener) throws JMSException {
checkClosed();
if (info.getPrefetchSize() == 0) {
- throw new JMSException(
- "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
+ throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
}
if (listener != null) {
boolean wasRunning = session.isRunning();
@@ -551,7 +552,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
session.acknowledge();
}
});
- }else if (session.isIndividualAcknowledge()) {
+ } else if (session.isIndividualAcknowledge()) {
m.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception {
session.checkClosed();
@@ -683,7 +684,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
this.session.asyncSendPacket(removeCommand);
if (interrupted) {
Thread.currentThread().interrupt();
- } }
+ }
+ }
void inProgressClearRequired() {
inProgressClearRequiredFlag.incrementAndGet();
@@ -772,6 +774,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
ThreadPoolUtils.shutdownGraceful(executorService, 60000L);
executorService = null;
}
+ if (optimizedAckTask != null) {
+ this.session.connection.getScheduler().cancel(optimizedAckTask);
+ optimizedAckTask = null;
+ }
if (session.isClientAcknowledge()) {
if (!this.info.isBrowser()) {
@@ -888,8 +894,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (!deliveredMessages.isEmpty()) {
if (optimizeAcknowledge) {
ackCounter++;
-
- // AMQ-3956 evaluate both expired and normal msgs as
+
+ // AMQ-3956 evaluate both expired and normal msgs as
// otherwise consumer may get stalled
if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
@@ -899,16 +905,16 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
session.sendAck(ack);
optimizeAckTimestamp = System.currentTimeMillis();
}
- // AMQ-3956 - as further optimization send
+ // AMQ-3956 - as further optimization send
// ack for expired msgs when there are any.
// This resets the deliveredCounter to 0 so that
// we won't sent standard acks with every msg just
- // because the deliveredCounter just below
+ // because the deliveredCounter just below
// 0.5 * prefetch as used in ackLater()
if (pendingAck != null && deliveredCounter > 0) {
- session.sendAck(pendingAck);
- pendingAck = null;
- deliveredCounter = 0;
+ session.sendAck(pendingAck);
+ pendingAck = null;
+ deliveredCounter = 0;
}
}
} else {
@@ -989,7 +995,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
}
- // AMQ-3956 evaluate both expired and normal msgs as
+ // AMQ-3956 evaluate both expired and normal msgs as
// otherwise consumer may get stalled
if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) {
session.sendAck(pendingAck);
@@ -1471,4 +1477,55 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void setFailureError(IOException failureError) {
this.failureError = failureError;
}
+
+ /**
+ * @return the optimizedAckScheduledAckInterval
+ */
+ public long getOptimizedAckScheduledAckInterval() {
+ return optimizedAckScheduledAckInterval;
+ }
+
+ /**
+ * @param optimizedAckScheduledAckInterval the optimizedAckScheduledAckInterval to set
+ */
+ public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) throws JMSException {
+ this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
+
+ if (this.optimizedAckTask != null) {
+ try {
+ this.session.connection.getScheduler().cancel(optimizedAckTask);
+ } catch (JMSException e) {
+ LOG.debug("Caught exception while cancelling old optimized ack task", e);
+ throw e;
+ }
+ this.optimizedAckTask = null;
+ }
+
+ // Should we periodically send out all outstanding acks.
+ if (this.optimizeAcknowledge && this.optimizedAckScheduledAckInterval > 0) {
+ this.optimizedAckTask = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ if (optimizeAcknowledge && !unconsumedMessages.isClosed()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Consumer:{} is performing scheduled delivery of outstanding optimized Acks", info.getConsumerId());
+ }
+ deliverAcks();
+ }
+ } catch (Exception e) {
+ LOG.debug("Optimized Ack Task caught exception during ack", e);
+ }
+ }
+ };
+
+ try {
+ this.session.connection.getScheduler().executePeriodically(optimizedAckTask, optimizedAckScheduledAckInterval);
+ } catch (JMSException e) {
+ LOG.debug("Caught exception while scheduling new optimized ack task", e);
+ throw e;
+ }
+ }
+ }
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java b/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
index 859f93f472..f746138ed8 100755
--- a/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
+++ b/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
@@ -19,6 +19,7 @@ package org.apache.activemq.thread;
import java.util.HashMap;
import java.util.Timer;
import java.util.TimerTask;
+
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
@@ -54,7 +55,7 @@ public final class Scheduler extends ServiceSupport {
TimerTask ticket = timerTasks.remove(task);
if (ticket != null) {
ticket.cancel();
- timer.purge();//remove cancelled TimerTasks
+ timer.purge(); // remove cancelled TimerTasks
}
}
@@ -70,7 +71,6 @@ public final class Scheduler extends ServiceSupport {
@Override
protected synchronized void doStart() throws Exception {
this.timer = new Timer(name, true);
-
}
@Override
@@ -78,7 +78,6 @@ public final class Scheduler extends ServiceSupport {
if (this.timer != null) {
this.timer.cancel();
}
-
}
public String getName() {
diff --git a/activemq-core/src/test/java/org/apache/activemq/OptimizedAckTest.java b/activemq-core/src/test/java/org/apache/activemq/OptimizedAckTest.java
index e8df76eb20..687b60b208 100644
--- a/activemq-core/src/test/java/org/apache/activemq/OptimizedAckTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/OptimizedAckTest.java
@@ -16,10 +16,13 @@
*/
package org.apache.activemq;
+import java.util.concurrent.TimeUnit;
+
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.util.Wait;
@@ -82,7 +85,6 @@ public class OptimizedAckTest extends TestSupport {
}
}
-
public void testVerySlowReceivedMessageStillInflight() throws Exception {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -119,4 +121,50 @@ public class OptimizedAckTest extends TestSupport {
}
}
}
+
+ public void testReceivedMessageNotInFlightAfterScheduledAckFires() throws Exception {
+ connection.setOptimizedAckScheduledAckInterval(TimeUnit.SECONDS.toMillis(10));
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("test");
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < 10; i++) {
+ producer.send(session.createTextMessage("Hello" + i));
+ }
+
+ final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
+ return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
+ }
+ }));
+
+ for (int i=0; i<10; i++) {
+ javax.jms.Message msg = consumer.receive(4000);
+ assertNotNull(msg);
+ if (i<7) {
+ assertEquals("all prefetch is still in flight", 10, regionBroker.getDestinationStatistics().getInflight().getCount());
+ } else {
+ assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition(){
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
+ return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
+ }
+ }));
+ }
+ }
+
+ assertTrue("After delay the scheduled ack should ack all inflight.", Wait.waitFor(new Wait.Condition(){
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
+ return 0 == regionBroker.getDestinationStatistics().getInflight().getCount();
+ }
+ }));
+ }
}