AMQ-6979 - use scheduler as trigger task - do heavy lifting via the task runner executor, take care to trap errors to keep scheduler timer alive AMQ-5129

This commit is contained in:
gtully 2018-05-30 22:03:50 +01:00
parent 20ec044c41
commit cdb38b3275
6 changed files with 144 additions and 5 deletions

View File

@ -2159,13 +2159,13 @@ public class BrokerService implements Service {
public void run() {
try {
checkStoreUsageLimits();
} catch (Exception e) {
} catch (Throwable e) {
LOG.error("Failed to check persistent disk usage limits", e);
}
try {
checkTmpStoreUsageLimits();
} catch (Exception e) {
} catch (Throwable e) {
LOG.error("Failed to check temporary store usage limits", e);
}
}

View File

@ -39,6 +39,7 @@ import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@ -144,10 +145,21 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
asyncWakeup();
}
};
private final Runnable expireMessagesTask = new Runnable() {
private final AtomicBoolean expiryTaskInProgress = new AtomicBoolean(false);
private final Runnable expireMessagesWork = new Runnable() {
@Override
public void run() {
expireMessages();
expiryTaskInProgress.set(false);
}
};
private final Runnable expireMessagesTask = new Runnable() {
@Override
public void run() {
if (expiryTaskInProgress.compareAndSet(false, true)) {
taskFactory.execute(expireMessagesWork);
}
}
};

View File

@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.InvalidClientIDException;
@ -112,10 +113,26 @@ public class RegionBroker extends EmptyBroker {
private boolean allowTempAutoCreationOnSend;
private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock();
private final TaskRunnerFactory taskRunnerFactory;
private final AtomicBoolean purgeInactiveDestinationsTaskInProgress = new AtomicBoolean(false);
private final Runnable purgeInactiveDestinationsTask = new Runnable() {
@Override
public void run() {
if (purgeInactiveDestinationsTaskInProgress.compareAndSet(false, true)) {
taskRunnerFactory.execute(purgeInactiveDestinationsWork);
}
}
};
private final Runnable purgeInactiveDestinationsWork = new Runnable() {
@Override
public void run() {
try {
purgeInactiveDestinations();
} catch (Throwable ignored) {
LOG.error("Unexpected exception on purgeInactiveDestinations {}", this, ignored);
} finally {
purgeInactiveDestinationsTaskInProgress.set(false);
}
}
};
@ -134,6 +151,7 @@ public class RegionBroker extends EmptyBroker {
this.destinationInterceptor = destinationInterceptor;
tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
this.taskRunnerFactory = taskRunnerFactory;
}
@Override

View File

@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.advisory.AdvisorySupport;
@ -75,6 +76,7 @@ public class Topic extends BaseDestination implements Task {
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
private final TaskRunner taskRunner;
private final TaskRunnerFactory taskRunnerFactor;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
@Override
@ -92,6 +94,7 @@ public class Topic extends BaseDestination implements Task {
this.topicStore = store;
subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
this.taskRunnerFactor = taskFactory;
}
@Override
@ -787,11 +790,21 @@ public class Topic extends BaseDestination implements Task {
}
}
private final Runnable expireMessagesTask = new Runnable() {
private final AtomicBoolean expiryTaskInProgress = new AtomicBoolean(false);
private final Runnable expireMessagesWork = new Runnable() {
@Override
public void run() {
List<Message> browsedMessages = new InsertionCountList<Message>();
doBrowse(browsedMessages, getMaxExpirePageSize());
expiryTaskInProgress.set(false);
}
};
private final Runnable expireMessagesTask = new Runnable() {
@Override
public void run() {
if (expiryTaskInProgress.compareAndSet(false, true)) {
taskRunnerFactor.execute(expireMessagesWork);
}
}
};

View File

@ -21,8 +21,12 @@ import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@ -39,6 +43,7 @@ import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.activemq.TestSupport.getDestination;
@ -150,6 +155,63 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
}
public void testReceiveTimeoutRespectedWithExpiryProcessing() throws Exception {
final ActiveMQDestination destination = new ActiveMQQueue("test");
broker = new BrokerService();
broker.setBrokerName("localhost");
broker.setDestinations(new ActiveMQDestination[]{destination});
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
PolicyEntry defaultPolicy = new PolicyEntry();
defaultPolicy.setExpireMessagesPeriod(1000);
defaultPolicy.setMaxExpirePageSize(2000);
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(defaultPolicy);
broker.setDestinationPolicy(policyMap);
broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
broker.addConnector("tcp://localhost:0");
broker.setPlugins(new BrokerPlugin[] { new BrokerPluginSupport() {
@Override
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) {
try {
LOG.info("Sleeping before delegation on sendToDeadLetterQueue");
TimeUnit.SECONDS.sleep(1);
} catch (Exception ignored) {}
return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
}
}});
broker.start();
broker.waitUntilStarted();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"vm://localhost");
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setAll(0);
factory.setPrefetchPolicy(prefetchPolicy);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination);
producer.setTimeToLive(1000);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i=0;i<10; i++) {
producer.send(session.createTextMessage("RTR"), DeliveryMode.PERSISTENT, 0, 2000);
}
consumer = session.createConsumer(new ActiveMQQueue("another-test"));
for (int i=0; i<10; i++) {
long timeStamp = System.currentTimeMillis();
consumer.receive(1000);
long duration = System.currentTimeMillis() - timeStamp;
LOG.info("Duration: " + i + " : " + duration);
assertTrue("Delay about 500: " + i + ", actual: " + duration, duration < 1500);
}
}
private void produce(int num, ActiveMQDestination destination) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"failover://"+brokerUri);

View File

@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@ -35,6 +36,8 @@ import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.After;
import org.junit.Assert;
@ -256,6 +259,30 @@ public class TopicSubscriptionZeroPrefetchTest {
Assert.assertNotNull("should have received a message the published message", consumedMessage);
}
@Test(timeout = 420000)
public void testReceiveTimeoutRespectedWithExpiryProcessing() throws Exception {
ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.prefetchSize=0");
for (int i=0; i<500; i++) {
consumer = session.createDurableSubscriber(consumerDestination, "mysub-" + i);
consumer.close();
}
for (int i=0;i<1000; i++) {
producer.send(session.createTextMessage("RTR"), DeliveryMode.PERSISTENT, 0, 5000);
}
consumer = session.createDurableSubscriber(consumerDestination, "mysub3");
for (int i=0; i<10; i++) {
long timeStamp = System.currentTimeMillis();
consumer.receive(1000);
long duration = System.currentTimeMillis() - timeStamp;
LOG.info("Duration: " + i + " : " + duration);
assertTrue("Delay about 500: " + i, duration < 1500);
}
}
@After
public void tearDown() throws Exception {
consumer.close();
@ -272,6 +299,13 @@ public class TopicSubscriptionZeroPrefetchTest {
broker.setUseJmx(false);
broker.setDeleteAllMessagesOnStartup(true);
broker.addConnector("vm://localhost");
PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setExpireMessagesPeriod(5000);
policyEntry.setMaxExpirePageSize(2000);
policyEntry.setUseCache(false);
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(policyEntry);
broker.setDestinationPolicy(policyMap);
broker.start();
broker.waitUntilStarted();
return broker;