mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-5274 - we now only check expiry on non inflight messages so there is on contention on ack with the periodic expriy check thread - related https://issues.apache.org/jira/browse/AMQ-2876
This commit is contained in:
parent
b1ede0559e
commit
26807cd452
|
@ -1169,7 +1169,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
List<MessageReference> toExpire) throws Exception {
|
List<MessageReference> toExpire) throws Exception {
|
||||||
for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < max;) {
|
for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < max;) {
|
||||||
QueueMessageReference ref = (QueueMessageReference) i.next();
|
QueueMessageReference ref = (QueueMessageReference) i.next();
|
||||||
if (ref.isExpired()) {
|
if (ref.isExpired() && (ref.getLockOwner() == null)) {
|
||||||
toExpire.add(ref);
|
toExpire.add(ref);
|
||||||
} else if (l.contains(ref.getMessage()) == false) {
|
} else if (l.contains(ref.getMessage()) == false) {
|
||||||
l.add(ref.getMessage());
|
l.add(ref.getMessage());
|
||||||
|
|
|
@ -52,14 +52,6 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
|
||||||
final Destination q = (Destination) n.getRegionDestination();
|
final Destination q = (Destination) n.getRegionDestination();
|
||||||
final QueueMessageReference node = (QueueMessageReference)n;
|
final QueueMessageReference node = (QueueMessageReference)n;
|
||||||
final Queue queue = (Queue)q;
|
final Queue queue = (Queue)q;
|
||||||
|
|
||||||
if (n.isExpired()) {
|
|
||||||
// sync with message expiry processing
|
|
||||||
if (!broker.isExpired(n)) {
|
|
||||||
LOG.debug("ignoring ack {}, for already expired message: {}", ack, n);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
queue.removeMessage(context, this, node, ack);
|
queue.removeMessage(context, this, node, ack);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,10 @@ import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.Topic;
|
import javax.jms.Topic;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerRegistry;
|
||||||
|
import org.apache.activemq.broker.region.DestinationStatistics;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.util.Wait;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -151,7 +155,7 @@ public class JmsSendReceiveWithMessageExpirationTest extends TestSupport {
|
||||||
received.acknowledge();
|
received.acknowledge();
|
||||||
};
|
};
|
||||||
|
|
||||||
assertEquals("got messages", messageCount + 1, messages.size());
|
assertEquals("got all (normal plus one with ttl) messages", messageCount + 1, messages.size());
|
||||||
|
|
||||||
Vector<Message> dlqMessages = new Vector<Message>();
|
Vector<Message> dlqMessages = new Vector<Message>();
|
||||||
while ((received = dlqConsumer.receive(1000)) != null) {
|
while ((received = dlqConsumer.receive(1000)) != null) {
|
||||||
|
@ -159,6 +163,21 @@ public class JmsSendReceiveWithMessageExpirationTest extends TestSupport {
|
||||||
};
|
};
|
||||||
|
|
||||||
assertEquals("got dlq messages", data.length - 1, dlqMessages.size());
|
assertEquals("got dlq messages", data.length - 1, dlqMessages.size());
|
||||||
|
|
||||||
|
final DestinationStatistics view = getDestinationStatistics(BrokerRegistry.getInstance().findFirst(), ActiveMQDestination.transform(consumerDestination));
|
||||||
|
|
||||||
|
// wait for all to inflight to expire
|
||||||
|
assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return view.getInflight().getCount() == 0;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
assertEquals("Wrong inFlightCount: ", 0, view.getInflight().getCount());
|
||||||
|
|
||||||
|
LOG.info("Stats: received: " + messages.size() + ", messages: " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount()
|
||||||
|
+ ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expired: " + view.getExpired().getCount());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,133 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.bugs;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.RedeliveryPolicy;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
|
||||||
|
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
public class AMQ5274Test {
|
||||||
|
static Logger LOG = LoggerFactory.getLogger(AMQ5274Test.class);
|
||||||
|
String activemqURL;
|
||||||
|
BrokerService brokerService;
|
||||||
|
ActiveMQQueue dest = new ActiveMQQueue("TestQ");
|
||||||
|
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void startBroker() throws Exception {
|
||||||
|
brokerService = new BrokerService();
|
||||||
|
brokerService.setPersistent(false);
|
||||||
|
PolicyMap policyMap = new PolicyMap();
|
||||||
|
PolicyEntry defaultPolicy = new PolicyEntry();
|
||||||
|
defaultPolicy.setExpireMessagesPeriod(1000);
|
||||||
|
policyMap.setDefaultEntry(defaultPolicy);
|
||||||
|
brokerService.setDestinationPolicy(policyMap);
|
||||||
|
activemqURL = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
|
||||||
|
brokerService.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void stopBroker() throws Exception {
|
||||||
|
if (brokerService != null) {
|
||||||
|
brokerService.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws Exception {
|
||||||
|
LOG.info("Starting Test");
|
||||||
|
assertTrue(brokerService.isStarted());
|
||||||
|
|
||||||
|
produce();
|
||||||
|
consumeAndRollback();
|
||||||
|
|
||||||
|
// check reported queue size using JMX
|
||||||
|
long queueSize = getQueueSize();
|
||||||
|
assertEquals("Queue " + dest.getPhysicalName() + " not empty, reporting " + queueSize + " messages.", 0, queueSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void consumeAndRollback() throws JMSException, InterruptedException {
|
||||||
|
ActiveMQConnection connection = createConnection();
|
||||||
|
RedeliveryPolicy noRedelivery = new RedeliveryPolicy();
|
||||||
|
noRedelivery.setMaximumRedeliveries(0);
|
||||||
|
connection.setRedeliveryPolicy(noRedelivery);
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
MessageConsumer consumer = session.createConsumer(dest);
|
||||||
|
Message m;
|
||||||
|
while ( (m = consumer.receive(4000)) != null) {
|
||||||
|
LOG.info("Got:" + m);
|
||||||
|
TimeUnit.SECONDS.sleep(1);
|
||||||
|
session.rollback();
|
||||||
|
}
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void produce() throws Exception {
|
||||||
|
Connection connection = createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer producer = session.createProducer(dest);
|
||||||
|
producer.setTimeToLive(10000);
|
||||||
|
for (int i=0;i<20;i++) {
|
||||||
|
producer.send(session.createTextMessage("i="+i));
|
||||||
|
}
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private ActiveMQConnection createConnection() throws JMSException {
|
||||||
|
return (ActiveMQConnection) new ActiveMQConnectionFactory(activemqURL).createConnection();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public long getQueueSize() throws Exception {
|
||||||
|
long queueSize = 0;
|
||||||
|
try {
|
||||||
|
QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(BrokerMBeanSupport.createDestinationName(brokerService.getBrokerObjectName(), dest), QueueViewMBean.class, false);
|
||||||
|
queueSize = queueViewMBean.getQueueSize();
|
||||||
|
LOG.info("QueueSize for destination {} is {}", dest, queueSize);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.error("Error retrieving QueueSize from JMX ", ex);
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
return queueSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -190,7 +190,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
||||||
|
|
||||||
// memory check
|
// memory check
|
||||||
assertEquals("memory usage is back to duck egg", 0, getDestination(broker, destination).getMemoryUsage().getPercentUsage());
|
assertEquals("memory usage is back to duck egg", 0, getDestination(broker, destination).getMemoryUsage().getPercentUsage());
|
||||||
assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getPercentUsage());
|
assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getUsage());
|
||||||
|
|
||||||
// verify DLQ
|
// verify DLQ
|
||||||
MessageConsumer dlqConsumer = createDlqConsumer(connection);
|
MessageConsumer dlqConsumer = createDlqConsumer(connection);
|
||||||
|
|
|
@ -257,7 +257,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
||||||
// first ack delivered after expiry
|
// first ack delivered after expiry
|
||||||
public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
|
public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
|
||||||
createBroker();
|
createBroker();
|
||||||
final long queuePrefetch = 600;
|
final long queuePrefetch = 5;
|
||||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
|
||||||
connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
|
connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
|
||||||
connection = factory.createConnection();
|
connection = factory.createConnection();
|
||||||
|
@ -266,7 +266,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
||||||
final int ttl = 4000;
|
final int ttl = 4000;
|
||||||
producer.setTimeToLive(ttl);
|
producer.setTimeToLive(ttl);
|
||||||
|
|
||||||
final long sendCount = 1500;
|
final long sendCount = 10;
|
||||||
final CountDownLatch receivedOneCondition = new CountDownLatch(1);
|
final CountDownLatch receivedOneCondition = new CountDownLatch(1);
|
||||||
final CountDownLatch waitCondition = new CountDownLatch(1);
|
final CountDownLatch waitCondition = new CountDownLatch(1);
|
||||||
|
|
||||||
|
@ -328,10 +328,14 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
||||||
return queuePrefetch == view.getDispatchCount();
|
return queuePrefetch == view.getDispatchCount();
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
assertTrue("Not all sent have expired ", Wait.waitFor(new Wait.Condition() {
|
assertTrue("all non inflight have expired ", Wait.waitFor(new Wait.Condition() {
|
||||||
@Override
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
return sendCount == view.getExpiredCount();
|
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
|
||||||
|
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
|
||||||
|
+ ", size= " + view.getQueueSize());
|
||||||
|
|
||||||
|
return view.getExpiredCount() > 0 && (view.getEnqueueCount() - view.getInFlightCount()) == view.getExpiredCount();
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
@ -448,10 +452,15 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
||||||
return queuePrefetch == view.getDispatchCount();
|
return queuePrefetch == view.getDispatchCount();
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
assertTrue("All have not sent have expired ", Wait.waitFor(new Wait.Condition() {
|
|
||||||
|
assertTrue("all non inflight have expired ", Wait.waitFor(new Wait.Condition() {
|
||||||
@Override
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
return sendCount == view.getExpiredCount();
|
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
|
||||||
|
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
|
||||||
|
+ ", size= " + view.getQueueSize());
|
||||||
|
|
||||||
|
return view.getExpiredCount() > 0 && (view.getEnqueueCount() - view.getInFlightCount()) == view.getExpiredCount();
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue