From 8b23e072eeab2beebf62fd267bf8d9f88d05b5c2 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 8 Mar 2016 13:37:58 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5785 Avoid holding the intrinsic lock on the cursor when expiring the messages. --- .../cursors/FilePendingMessageCursor.java | 40 +++++++++++++------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index 9c9a8e7cae..3c2bd5f84a 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -17,10 +17,13 @@ package org.apache.activemq.broker.region.cursors; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; @@ -31,25 +34,26 @@ import org.apache.activemq.command.Message; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.store.PList; -import org.apache.activemq.store.PListStore; import org.apache.activemq.store.PListEntry; +import org.apache.activemq.store.PListStore; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.UsageListener; +import org.apache.activemq.util.ByteSequence; import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.activemq.util.ByteSequence; /** * persist pending messages pending message (messages awaiting dispatch to a * consumer) cursor - * - * */ public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener { + static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class); + private static final AtomicLong NAME_COUNT = new AtomicLong(); + protected Broker broker; private final PListStore store; private final String name; @@ -61,6 +65,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple private boolean flushRequired; private final AtomicBoolean started = new AtomicBoolean(); private final WireFormat wireFormat = new OpenWireFormat(); + /** * @param broker * @param name @@ -374,9 +379,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple @Override public synchronized boolean isFull() { - return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull()); - } @Override @@ -392,11 +395,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple @Override public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { if (newPercentUsage >= getMemoryUsageHighWaterMark()) { + List expiredMessages = null; synchronized (this) { if (!flushRequired && size() != 0) { flushRequired =true; if (!iterating) { - expireOldMessages(); + expiredMessages = expireOldMessages(); if (!hasSpace()) { flushToDisk(); flushRequired = false; @@ -404,6 +408,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple } } } + + if (expiredMessages != null) { + for (MessageReference node : expiredMessages) { + discardExpiredMessage(node); + } + } } } @@ -412,26 +422,30 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple return true; } - protected synchronized void expireOldMessages() { + private synchronized List expireOldMessages() { + List expired = new ArrayList(); if (!memoryList.isEmpty()) { for (Iterator iterator = memoryList.iterator(); iterator.hasNext();) { MessageReference node = iterator.next(); if (node.isExpired()) { node.decrementReferenceCount(); - discardExpiredMessage(node); + expired.add(node); iterator.remove(); } } } + + return expired; } protected synchronized void flushToDisk() { if (!memoryList.isEmpty() && store != null) { long start = 0; - if (LOG.isTraceEnabled()) { + if (LOG.isTraceEnabled()) { start = System.currentTimeMillis(); - LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[]{ name, memoryList.size(), (systemUsage != null ? systemUsage.getMemoryUsage() : "") }); - } + LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[] { name, memoryList.size(), + (systemUsage != null ? systemUsage.getMemoryUsage() : "") }); + } for (Iterator iterator = memoryList.iterator(); iterator.hasNext();) { MessageReference node = iterator.next(); node.decrementReferenceCount();