Avoid holding the intrinsic lock on the cursor when expiring the
messages.
This commit is contained in:
Timothy Bish 2016-03-08 13:37:58 -05:00
parent 078f39f589
commit 8b23e072ee
1 changed files with 27 additions and 13 deletions

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -17,10 +17,13 @@
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
@ -31,25 +34,26 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.PList;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.store.PListEntry;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.activemq.util.ByteSequence;
/**
* persist pending messages pending message (messages awaiting dispatch to a
* consumer) cursor
*
*
*/
public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class);
private static final AtomicLong NAME_COUNT = new AtomicLong();
protected Broker broker;
private final PListStore store;
private final String name;
@ -61,6 +65,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
private boolean flushRequired;
private final AtomicBoolean started = new AtomicBoolean();
private final WireFormat wireFormat = new OpenWireFormat();
/**
* @param broker
* @param name
@ -374,9 +379,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
@Override
public synchronized boolean isFull() {
return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull());
}
@Override
@ -392,11 +395,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
@Override
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
List<MessageReference> expiredMessages = null;
synchronized (this) {
if (!flushRequired && size() != 0) {
flushRequired =true;
if (!iterating) {
expireOldMessages();
expiredMessages = expireOldMessages();
if (!hasSpace()) {
flushToDisk();
flushRequired = false;
@ -404,6 +408,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
}
}
}
if (expiredMessages != null) {
for (MessageReference node : expiredMessages) {
discardExpiredMessage(node);
}
}
}
}
@ -412,26 +422,30 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
return true;
}
protected synchronized void expireOldMessages() {
private synchronized List<MessageReference> expireOldMessages() {
List<MessageReference> expired = new ArrayList<MessageReference>();
if (!memoryList.isEmpty()) {
for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
MessageReference node = iterator.next();
if (node.isExpired()) {
node.decrementReferenceCount();
discardExpiredMessage(node);
expired.add(node);
iterator.remove();
}
}
}
return expired;
}
protected synchronized void flushToDisk() {
if (!memoryList.isEmpty() && store != null) {
long start = 0;
if (LOG.isTraceEnabled()) {
if (LOG.isTraceEnabled()) {
start = System.currentTimeMillis();
LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[]{ name, memoryList.size(), (systemUsage != null ? systemUsage.getMemoryUsage() : "") });
}
LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[] { name, memoryList.size(),
(systemUsage != null ? systemUsage.getMemoryUsage() : "") });
}
for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
MessageReference node = iterator.next();
node.decrementReferenceCount();