ARTEMIS-1121 Improving expiry scanner
https://issues.apache.org/jira/browse/ARTEMIS-1121
This commit is contained in:
parent
31d78eddf1
commit
1a39772489
|
@ -633,7 +633,8 @@ public class AMQPMessage extends RefCountMessage {
|
|||
|
||||
private synchronized void checkBuffer() {
|
||||
if (!bufferValid) {
|
||||
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1500);
|
||||
int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
|
||||
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
|
||||
try {
|
||||
getProtonMessage().encode(new NettyWritable(buffer));
|
||||
byte[] bytes = new byte[buffer.writerIndex()];
|
||||
|
|
|
@ -683,7 +683,7 @@ public class PagingStoreImpl implements PagingStore {
|
|||
if (pagingManager.isDiskFull()) {
|
||||
ActiveMQServerLogger.LOGGER.blockingDiskFull(address);
|
||||
} else {
|
||||
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize);
|
||||
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize());
|
||||
}
|
||||
blocking.set(true);
|
||||
}
|
||||
|
|
|
@ -944,7 +944,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
|||
void errorExpiringReferencesNoBindings(SimpleString expiryAddress);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 222147, value = "Message has expired. No expiry queue configured for queue {0} so dropping it", format = Message.Format.MESSAGE_FORMAT)
|
||||
@Message(id = 222147, value = "Messages are being expired on queue{0}. However there is no expiry queue configured, hence messages will be dropped.", format = Message.Format.MESSAGE_FORMAT)
|
||||
void errorExpiringReferencesNoQueue(SimpleString name);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
|
@ -1104,8 +1104,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
|||
void missingClusterConfigForScaleDown(String scaleDownCluster);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 222183, value = "Blocking message production on address ''{0}''; size is currently: {1} bytes; max-size-bytes: {2}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void blockingMessageProduction(SimpleString addressName, long currentSize, long maxSize);
|
||||
@Message(id = 222183, value = "Blocking message production on address ''{0}''; size is currently: {1} bytes; max-size-bytes on address: {2}, global-max-size is {3}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void blockingMessageProduction(SimpleString addressName, long currentSize, long maxSize, long globalMaxSize);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 222184,
|
||||
|
|
|
@ -146,6 +146,8 @@ public class QueueImpl implements Queue {
|
|||
|
||||
private final LinkedListIterator<PagedReference> pageIterator;
|
||||
|
||||
private volatile boolean printErrorExpiring = false;
|
||||
|
||||
// Messages will first enter intermediateMessageReferences
|
||||
// Before they are added to messageReferences
|
||||
// This is to avoid locking the queue on the producer
|
||||
|
@ -1567,27 +1569,52 @@ public class QueueImpl implements Queue {
|
|||
if (queueDestroyed) {
|
||||
return;
|
||||
}
|
||||
logger.debug("Scanning for expires on " + QueueImpl.this.getName());
|
||||
|
||||
LinkedListIterator<MessageReference> iter = iterator();
|
||||
|
||||
boolean expired = false;
|
||||
boolean hasElements = false;
|
||||
|
||||
int elementsExpired = 0;
|
||||
try {
|
||||
boolean expired = false;
|
||||
boolean hasElements = false;
|
||||
Transaction tx = null;
|
||||
|
||||
while (postOffice.isStarted() && iter.hasNext()) {
|
||||
hasElements = true;
|
||||
MessageReference ref = iter.next();
|
||||
try {
|
||||
if (ref.getMessage().isExpired()) {
|
||||
if (tx == null) {
|
||||
tx = new TransactionImpl(storageManager);
|
||||
}
|
||||
incDelivering();
|
||||
expired = true;
|
||||
expire(ref);
|
||||
expire(tx, ref);
|
||||
iter.remove();
|
||||
refRemoved(ref);
|
||||
|
||||
if (++elementsExpired >= MAX_DELIVERIES_IN_LOOP) {
|
||||
logger.debug("Breaking loop of expiring");
|
||||
scannerRunning.incrementAndGet();
|
||||
getExecutor().execute(this);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(e, ref);
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug("Expired " + elementsExpired + " references");
|
||||
|
||||
try {
|
||||
if (tx != null) {
|
||||
tx.commit();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
|
||||
// If empty we need to schedule depaging to make sure we would depage expired messages as well
|
||||
|
@ -1600,6 +1627,8 @@ public class QueueImpl implements Queue {
|
|||
} catch (Throwable ignored) {
|
||||
}
|
||||
scannerRunning.decrementAndGet();
|
||||
logger.debug("Scanning for expires on " + QueueImpl.this.getName() + " done");
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1912,7 +1941,6 @@ public class QueueImpl implements Queue {
|
|||
return "QueueImpl[name=" + name.toString() + ", postOffice=" + this.postOffice + ", temp=" + this.temporary + "]@" + Integer.toHexString(System.identityHashCode(this));
|
||||
}
|
||||
|
||||
|
||||
private synchronized void internalAddTail(final MessageReference ref) {
|
||||
refAdded(ref);
|
||||
messageReferences.addTail(ref, getPriority(ref));
|
||||
|
@ -2519,7 +2547,11 @@ public class QueueImpl implements Queue {
|
|||
move(expiryAddress, tx, ref, true, true);
|
||||
}
|
||||
} else {
|
||||
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name);
|
||||
if (!printErrorExpiring) {
|
||||
printErrorExpiring = true;
|
||||
// print this only once
|
||||
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name);
|
||||
}
|
||||
|
||||
acknowledge(tx, ref);
|
||||
}
|
||||
|
@ -3015,7 +3047,7 @@ public class QueueImpl implements Queue {
|
|||
if (messagesIterator != null && messagesIterator.hasNext()) {
|
||||
MessageReference msg = messagesIterator.next();
|
||||
if (msg.isPaged()) {
|
||||
previouslyBrowsed.add(((PagedReference)msg).getPosition());
|
||||
previouslyBrowsed.add(((PagedReference) msg).getPosition());
|
||||
}
|
||||
return msg;
|
||||
} else {
|
||||
|
@ -3156,7 +3188,7 @@ public class QueueImpl implements Queue {
|
|||
if (consumersSet.size() == 0) {
|
||||
logger.debug("There are no consumers, no need to check slow consumer's rate");
|
||||
return;
|
||||
} else if (queueRate < (threshold * consumersSet.size())) {
|
||||
} else if (queueRate < (threshold * consumersSet.size())) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
|
||||
}
|
||||
|
|
|
@ -172,6 +172,23 @@
|
|||
<configuration>${basedir}/target/classes/servers/replicated-static1</configuration>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<phase>test-compile</phase>
|
||||
<id>create-expire</id>
|
||||
<goals>
|
||||
<goal>create</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<!-- this makes it easier in certain envs -->
|
||||
<configuration>${basedir}/target/classes/servers/expire</configuration>
|
||||
<javaOptions>-Dartemis.debug.paging.interval=1</javaOptions>
|
||||
<allowAnonymous>true</allowAnonymous>
|
||||
<user>admin</user>
|
||||
<password>admin</password>
|
||||
<instance>${basedir}/target/expire</instance>
|
||||
</configuration>
|
||||
</execution>
|
||||
|
||||
</executions>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
|
|
|
@ -0,0 +1,184 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq:core ">
|
||||
|
||||
<name>0.0.0.0</name>
|
||||
|
||||
<persistence-enabled>true</persistence-enabled>
|
||||
|
||||
<!-- this could be ASYNCIO or NIO
|
||||
-->
|
||||
<journal-type>NIO</journal-type>
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/large-messages</large-messages-directory>
|
||||
|
||||
<journal-datasync>true</journal-datasync>
|
||||
|
||||
<journal-min-files>2</journal-min-files>
|
||||
|
||||
<journal-pool-files>-1</journal-pool-files>
|
||||
|
||||
<message-expiry-scan-period>1000</message-expiry-scan-period>
|
||||
|
||||
<!--
|
||||
You can specify the NIC you want to use to verify if the network
|
||||
<network-check-NIC>theNickName</network-check-NIC>
|
||||
-->
|
||||
|
||||
<!--
|
||||
Use this to use an HTTP server to validate the network
|
||||
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
|
||||
|
||||
<!-- <network-check-period>10000</network-check-period> -->
|
||||
<!-- <network-check-timeout>1000</network-check-timeout> -->
|
||||
|
||||
<!-- this is a comma separated list, no spaces, just DNS or IPs
|
||||
it should accept IPV6
|
||||
|
||||
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
|
||||
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
|
||||
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
|
||||
<!-- <network-check-list>10.0.0.1</network-check-list> -->
|
||||
|
||||
<!-- use this to customize the ping used for ipv4 addresses -->
|
||||
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
|
||||
|
||||
<!-- use this to customize the ping used for ipv6 addresses -->
|
||||
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
|
||||
|
||||
|
||||
|
||||
|
||||
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
|
||||
<disk-scan-period>5000</disk-scan-period>
|
||||
|
||||
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
|
||||
that won't support flow control. -->
|
||||
<max-disk-usage>90</max-disk-usage>
|
||||
|
||||
<!-- the system will enter into page mode once you hit this limit.
|
||||
This is an estimate in bytes of how much the messages are using in memory
|
||||
|
||||
The system will use half of the available memory (-Xmx) by default for the global-max-size.
|
||||
You may specify a different value here if you need to customize it to your needs.
|
||||
|
||||
<global-max-size>100Mb</global-max-size>
|
||||
|
||||
-->
|
||||
|
||||
<acceptors>
|
||||
|
||||
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
|
||||
<!-- amqpCredits: The number of credits sent to AMQP producers -->
|
||||
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
|
||||
|
||||
<!-- Acceptor for every supported protocol -->
|
||||
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
|
||||
|
||||
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
|
||||
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
|
||||
|
||||
<!-- STOMP Acceptor. -->
|
||||
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
|
||||
|
||||
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
|
||||
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
|
||||
|
||||
<!-- MQTT Acceptor -->
|
||||
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
|
||||
<security-settings>
|
||||
<security-setting match="#">
|
||||
<permission type="createNonDurableQueue" roles="guest"/>
|
||||
<permission type="deleteNonDurableQueue" roles="guest"/>
|
||||
<permission type="createDurableQueue" roles="guest"/>
|
||||
<permission type="deleteDurableQueue" roles="guest"/>
|
||||
<permission type="createAddress" roles="guest"/>
|
||||
<permission type="deleteAddress" roles="guest"/>
|
||||
<permission type="consume" roles="guest"/>
|
||||
<permission type="browse" roles="guest"/>
|
||||
<permission type="send" roles="guest"/>
|
||||
<!-- we need this otherwise ./artemis data imp wouldn't work -->
|
||||
<permission type="manage" roles="guest"/>
|
||||
</security-setting>
|
||||
</security-settings>
|
||||
|
||||
<address-settings>
|
||||
<!-- if you define auto-create on certain queues, management has to be auto-create -->
|
||||
<address-setting match="activemq.management#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<!-- with -1 only the global-max-size is in use for limiting -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-create-jms-queues>true</auto-create-jms-queues>
|
||||
<auto-create-jms-topics>true</auto-create-jms-topics>
|
||||
</address-setting>
|
||||
<!--default for catch all-->
|
||||
<address-setting match="#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<!-- with -1 only the global-max-size is in use for limiting -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-create-jms-queues>true</auto-create-jms-queues>
|
||||
<auto-create-jms-topics>true</auto-create-jms-topics>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
|
||||
<addresses>
|
||||
<address name="DLQ">
|
||||
<anycast>
|
||||
<queue name="DLQ" />
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="ExpiryQueue">
|
||||
<anycast>
|
||||
<queue name="ExpiryQueue" />
|
||||
</anycast>
|
||||
</address>
|
||||
|
||||
</addresses>
|
||||
|
||||
</core>
|
||||
</configuration>
|
|
@ -0,0 +1,92 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.smoke.expire;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestSimpleExpire extends SmokeTestBase {
|
||||
|
||||
public static final String SERVER_NAME_0 = "expire";
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
cleanupData(SERVER_NAME_0);
|
||||
disableCheckThread();
|
||||
startServer(SERVER_NAME_0, 0, 30000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendExpire() throws Exception {
|
||||
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
|
||||
Queue queue = session.createQueue("q0");
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
|
||||
producer.setTimeToLive(1000);
|
||||
for (int i = 0; i < 20000; i++) {
|
||||
producer.send(session.createTextMessage("expired"));
|
||||
if (i % 5000 == 0) {
|
||||
session.commit();
|
||||
System.out.println("Sent " + i + " + messages");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
session.commit();
|
||||
|
||||
Thread.sleep(5000);
|
||||
producer.setTimeToLive(0);
|
||||
for (int i = 0; i < 500; i++) {
|
||||
producer.send(session.createTextMessage("ok"));
|
||||
|
||||
}
|
||||
session.commit();
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
connection.start();
|
||||
|
||||
|
||||
for (int i = 0; i < 500; i++) {
|
||||
TextMessage txt = (TextMessage) consumer.receive(10000);
|
||||
Assert.assertNotNull(txt);
|
||||
Assert.assertEquals("ok", txt.getText());
|
||||
}
|
||||
|
||||
session.commit();
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue