Usage and MemoryUsage sync fixes to keep state consistent during thread contention.  Test case added.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1481106 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-05-10 17:13:48 +00:00
parent e0ae445900
commit b820ac2bf9
4 changed files with 349 additions and 95 deletions

View File

@ -22,9 +22,9 @@ import org.apache.activemq.store.PersistenceAdapter;
* Used to keep track of how much of something is being used so that a
* productive working set usage can be controlled. Main use case is manage
* memory usage.
*
*
* @org.apache.xbean.XBean
*
*
*/
public class StoreUsage extends Usage<StoreUsage> {
@ -44,6 +44,7 @@ public class StoreUsage extends Usage<StoreUsage> {
this.store = parent.store;
}
@Override
protected long retrieveUsage() {
if (store == null)
return 0;
@ -61,9 +62,12 @@ public class StoreUsage extends Usage<StoreUsage> {
@Override
public int getPercentUsage() {
synchronized (usageMutex) {
usageLock.writeLock().lock();
try {
percentUsage = caclPercentUsage();
return super.getPercentUsage();
} finally {
usageLock.writeLock().unlock();
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.usage;
import java.util.concurrent.TimeUnit;
/**
* Used to keep track of how much of something is being used so that a
* productive working set usage can be controlled. Main use case is manage
@ -63,14 +65,26 @@ public class MemoryUsage extends Usage<MemoryUsage> {
if (parent != null) {
parent.waitForSpace();
}
synchronized (usageMutex) {
while (percentUsage >= 100 && isStarted()) {
usageMutex.wait();
usageLock.readLock().lock();
try {
if (percentUsage >= 100 && isStarted()) {
usageLock.readLock().unlock();
usageLock.writeLock().lock();
try {
while (percentUsage >= 100 && isStarted()) {
waitForSpaceCondition.await();
}
usageLock.readLock().lock();
} finally {
usageLock.writeLock().unlock();
}
}
if (percentUsage >= 100 && !isStarted()) {
throw new InterruptedException("waitForSpace stopped during wait.");
}
} finally {
usageLock.readLock().unlock();
}
}
@ -86,11 +100,24 @@ public class MemoryUsage extends Usage<MemoryUsage> {
return false;
}
}
synchronized (usageMutex) {
usageLock.readLock().lock();
try {
if (percentUsage >= 100) {
usageMutex.wait(timeout);
usageLock.readLock().unlock();
usageLock.writeLock().lock();
try {
while (percentUsage >= 100 ) {
waitForSpaceCondition.await(timeout, TimeUnit.MILLISECONDS);
}
usageLock.readLock().lock();
} finally {
usageLock.writeLock().unlock();
}
}
return percentUsage < 100;
} finally {
usageLock.readLock().unlock();
}
}
@ -99,8 +126,11 @@ public class MemoryUsage extends Usage<MemoryUsage> {
if (parent != null && parent.isFull()) {
return true;
}
synchronized (usageMutex) {
usageLock.readLock().lock();
try {
return percentUsage >= 100;
} finally {
usageLock.readLock().unlock();
}
}
@ -125,12 +155,15 @@ public class MemoryUsage extends Usage<MemoryUsage> {
if (value == 0) {
return;
}
int percentUsage;
synchronized (usageMutex) {
usageLock.writeLock().lock();
try {
usage += value;
percentUsage = caclPercentUsage();
setPercentUsage(caclPercentUsage());
} finally {
usageLock.writeLock().unlock();
}
setPercentUsage(percentUsage);
if (parent != null) {
parent.increaseUsage(value);
}
@ -145,12 +178,15 @@ public class MemoryUsage extends Usage<MemoryUsage> {
if (value == 0) {
return;
}
int percentUsage;
synchronized (usageMutex) {
usageLock.writeLock().lock();
try {
usage -= value;
percentUsage = caclPercentUsage();
setPercentUsage(caclPercentUsage());
} finally {
usageLock.writeLock().unlock();
}
setPercentUsage(percentUsage);
if (parent != null) {
parent.decreaseUsage(value);
}

View File

@ -16,21 +16,22 @@
*/
package org.apache.activemq.usage;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Used to keep track of how much of something is being used so that a
* productive working set usage can be controlled. Main use case is manage
* memory usage.
* Used to keep track of how much of something is being used so that a productive working set usage can be controlled.
* Main use case is manage memory usage.
*
* @org.apache.xbean.XBean
*
@ -38,25 +39,29 @@ import org.slf4j.LoggerFactory;
public abstract class Usage<T extends Usage> implements Service {
private static final Logger LOG = LoggerFactory.getLogger(Usage.class);
protected final Object usageMutex = new Object();
protected final ReentrantReadWriteLock usageLock = new ReentrantReadWriteLock();
protected final Condition waitForSpaceCondition = usageLock.writeLock().newCondition();
protected int percentUsage;
protected T parent;
protected String name;
private UsageCapacity limiter = new DefaultUsageCapacity();
private int percentUsageMinDelta = 1;
private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>();
private final boolean debug = LOG.isDebugEnabled();
protected String name;
private float usagePortion = 1.0f;
private final List<T> children = new CopyOnWriteArrayList<T>();
private final List<Runnable> callbacks = new LinkedList<Runnable>();
private int pollingTime = 100;
private final AtomicBoolean started=new AtomicBoolean();
private final AtomicBoolean started = new AtomicBoolean();
private ThreadPoolExecutor executor;
public Usage(T parent, String name, float portion) {
this.parent = parent;
this.usagePortion = portion;
if (parent != null) {
this.limiter.setLimit((long)(parent.getLimit() * portion));
this.limiter.setLimit((long) (parent.getLimit() * portion));
name = parent.name + ":" + name;
}
this.name = name;
@ -86,15 +91,16 @@ public abstract class Usage<T extends Usage> implements Service {
return false;
}
}
synchronized (usageMutex) {
percentUsage=caclPercentUsage();
usageLock.writeLock().lock();
try {
percentUsage = caclPercentUsage();
if (percentUsage >= highWaterMark) {
long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
long timeleft = deadline;
while (timeleft > 0) {
percentUsage=caclPercentUsage();
percentUsage = caclPercentUsage();
if (percentUsage >= highWaterMark) {
usageMutex.wait(pollingTime);
waitForSpaceCondition.await(pollingTime, TimeUnit.MILLISECONDS);
timeleft = deadline - System.currentTimeMillis();
} else {
break;
@ -102,6 +108,8 @@ public abstract class Usage<T extends Usage> implements Service {
}
}
return percentUsage < highWaterMark;
} finally {
usageLock.writeLock().unlock();
}
}
@ -113,9 +121,12 @@ public abstract class Usage<T extends Usage> implements Service {
if (parent != null && parent.isFull(highWaterMark)) {
return true;
}
synchronized (usageMutex) {
percentUsage=caclPercentUsage();
usageLock.writeLock().lock();
try {
percentUsage = caclPercentUsage();
return percentUsage >= highWaterMark;
} finally {
usageLock.writeLock().unlock();
}
}
@ -128,16 +139,18 @@ public abstract class Usage<T extends Usage> implements Service {
}
public long getLimit() {
synchronized (usageMutex) {
usageLock.readLock().lock();
try {
return limiter.getLimit();
} finally {
usageLock.readLock().unlock();
}
}
/**
* Sets the memory limit in bytes. Setting the limit in bytes will set the
* usagePortion to 0 since the UsageManager is not going to be portion based
* off the parent.
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
* Sets the memory limit in bytes. Setting the limit in bytes will set the usagePortion to 0 since the UsageManager
* is not going to be portion based off the parent. When set using Xbean, values of the form "20 Mb", "1024kb", and
* "1g" can be used
*
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
*/
@ -145,9 +158,12 @@ public abstract class Usage<T extends Usage> implements Service {
if (percentUsageMinDelta < 0) {
throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
}
synchronized (usageMutex) {
usageLock.writeLock().lock();
try {
this.limiter.setLimit(limit);
this.usagePortion = 0;
} finally {
usageLock.writeLock().unlock();
}
onLimitChange();
}
@ -155,52 +171,67 @@ public abstract class Usage<T extends Usage> implements Service {
protected void onLimitChange() {
// We may need to calculate the limit
if (usagePortion > 0 && parent != null) {
synchronized (usageMutex) {
this.limiter.setLimit((long)(parent.getLimit() * usagePortion));
usageLock.writeLock().lock();
try {
this.limiter.setLimit((long) (parent.getLimit() * usagePortion));
} finally {
usageLock.writeLock().unlock();
}
}
// Reset the percent currently being used.
int percentUsage;
synchronized (usageMutex) {
percentUsage = caclPercentUsage();
usageLock.writeLock().lock();
try {
setPercentUsage(caclPercentUsage());
} finally {
usageLock.writeLock().unlock();
}
setPercentUsage(percentUsage);
// Let the children know that the limit has changed. They may need to
// set
// their limits based on ours.
// set their limits based on ours.
for (T child : children) {
child.onLimitChange();
}
}
public float getUsagePortion() {
synchronized (usageMutex) {
usageLock.readLock().lock();
try {
return usagePortion;
} finally {
usageLock.readLock().unlock();
}
}
public void setUsagePortion(float usagePortion) {
synchronized (usageMutex) {
usageLock.writeLock().lock();
try {
this.usagePortion = usagePortion;
} finally {
usageLock.writeLock().unlock();
}
onLimitChange();
}
public int getPercentUsage() {
synchronized (usageMutex) {
usageLock.readLock().lock();
try {
return percentUsage;
} finally {
usageLock.readLock().unlock();
}
}
public int getPercentUsageMinDelta() {
synchronized (usageMutex) {
usageLock.readLock().lock();
try {
return percentUsageMinDelta;
} finally {
usageLock.readLock().unlock();
}
}
/**
* Sets the minimum number of percentage points the usage has to change
* before a UsageListener event is fired by the manager.
* Sets the minimum number of percentage points the usage has to change before a UsageListener event is fired by the
* manager.
*
* @param percentUsageMinDelta
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
@ -209,27 +240,35 @@ public abstract class Usage<T extends Usage> implements Service {
if (percentUsageMinDelta < 1) {
throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0");
}
int percentUsage;
synchronized (usageMutex) {
usageLock.writeLock().lock();
try {
this.percentUsageMinDelta = percentUsageMinDelta;
percentUsage = caclPercentUsage();
setPercentUsage(caclPercentUsage());
} finally {
usageLock.writeLock().unlock();
}
setPercentUsage(percentUsage);
}
public long getUsage() {
synchronized (usageMutex) {
usageLock.readLock().lock();
try {
return retrieveUsage();
} finally {
usageLock.readLock().unlock();
}
}
protected void setPercentUsage(int value) {
synchronized (usageMutex) {
usageLock.writeLock().lock();
try {
int oldValue = percentUsage;
percentUsage = value;
if (oldValue != value) {
fireEvent(oldValue, value);
}
} finally {
usageLock.writeLock().unlock();
}
}
@ -237,26 +276,23 @@ public abstract class Usage<T extends Usage> implements Service {
if (limiter.getLimit() == 0) {
return 0;
}
return (int)((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta);
return (int) ((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta);
}
// Must be called with the usage lock's writeLock held.
private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
if (debug) {
LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: "
+ newPercentUsage + "% of available memory");
LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: " + newPercentUsage + "% of available memory");
}
if (started.get()) {
// Switching from being full to not being full..
if (oldPercentUsage >= 100 && newPercentUsage < 100) {
synchronized (usageMutex) {
usageMutex.notifyAll();
if (!callbacks.isEmpty()) {
for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
Runnable callback = iter.next();
getExecutor().execute(callback);
}
callbacks.clear();
waitForSpaceCondition.signalAll();
if (!callbacks.isEmpty()) {
for (Runnable callback : callbacks) {
getExecutor().execute(callback);
}
callbacks.clear();
}
}
if (!listeners.isEmpty()) {
@ -264,9 +300,8 @@ public abstract class Usage<T extends Usage> implements Service {
Runnable listenerNotifier = new Runnable() {
@Override
public void run() {
for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) {
UsageListener l = iter.next();
l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
for (UsageListener listener : listeners) {
listener.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
}
}
};
@ -285,24 +320,21 @@ public abstract class Usage<T extends Usage> implements Service {
@Override
public String toString() {
return "Usage(" + getName() + ") percentUsage=" + percentUsage
+ "%, usage=" + retrieveUsage() + ", limit=" + limiter.getLimit()
+ ", percentUsageMinDelta=" + percentUsageMinDelta + "%"
+ (parent != null ? ";Parent:" + parent.toString() : "");
return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + ", limit=" + limiter.getLimit()
+ ", percentUsageMinDelta=" + percentUsageMinDelta + "%" + (parent != null ? ";Parent:" + parent.toString() : "");
}
@Override
@SuppressWarnings("unchecked")
public void start() {
if (started.compareAndSet(false, true)){
if (started.compareAndSet(false, true)) {
if (parent != null) {
parent.addChild(this);
if(getLimit() > parent.getLimit()) {
LOG.info("Usage({}) limit={} should be smaller than its parent limit={}",
new Object[]{getName(), getLimit(), parent.getLimit()});
if (getLimit() > parent.getLimit()) {
LOG.info("Usage({}) limit={} should be smaller than its parent limit={}", new Object[] { getName(), getLimit(), parent.getLimit() });
}
}
for (T t:children) {
for (T t : children) {
t.start();
}
}
@ -311,21 +343,24 @@ public abstract class Usage<T extends Usage> implements Service {
@Override
@SuppressWarnings("unchecked")
public void stop() {
if (started.compareAndSet(true, false)){
if (started.compareAndSet(true, false)) {
if (parent != null) {
parent.removeChild(this);
}
//clear down any callbacks
synchronized (usageMutex) {
usageMutex.notifyAll();
for (Iterator<Runnable> iter = new ArrayList<Runnable>(this.callbacks).iterator(); iter.hasNext();) {
Runnable callback = iter.next();
// clear down any callbacks
usageLock.writeLock().lock();
try {
waitForSpaceCondition.signalAll();
for (Runnable callback : this.callbacks) {
callback.run();
}
this.callbacks.clear();
} finally {
usageLock.writeLock().unlock();
}
for (T t:children) {
for (T t : children) {
t.stop();
}
}
@ -344,8 +379,7 @@ public abstract class Usage<T extends Usage> implements Service {
/**
* @param callback
* @return true if the UsageManager was full. The callback will only be
* called if this method returns true.
* @return true if the UsageManager was full. The callback will only be called if this method returns true.
*/
public boolean notifyCallbackWhenNotFull(final Runnable callback) {
if (parent != null) {
@ -353,12 +387,15 @@ public abstract class Usage<T extends Usage> implements Service {
@Override
public void run() {
synchronized (usageMutex) {
usageLock.writeLock().lock();
try {
if (percentUsage >= 100) {
callbacks.add(callback);
} else {
callback.run();
}
} finally {
usageLock.writeLock().unlock();
}
}
};
@ -366,13 +403,16 @@ public abstract class Usage<T extends Usage> implements Service {
return true;
}
}
synchronized (usageMutex) {
usageLock.writeLock().lock();
try {
if (percentUsage >= 100) {
callbacks.add(callback);
return true;
} else {
return false;
}
} finally {
usageLock.writeLock().unlock();
}
}
@ -384,7 +424,8 @@ public abstract class Usage<T extends Usage> implements Service {
}
/**
* @param limiter the limiter to set
* @param limiter
* the limiter to set
*/
public void setLimiter(UsageCapacity limiter) {
this.limiter = limiter;
@ -398,7 +439,8 @@ public abstract class Usage<T extends Usage> implements Service {
}
/**
* @param pollingTime the pollingTime to set
* @param pollingTime
* the pollingTime to set
*/
public void setPollingTime(int pollingTime) {
this.pollingTime = pollingTime;
@ -416,7 +458,7 @@ public abstract class Usage<T extends Usage> implements Service {
this.parent = parent;
}
public void setExecutor (ThreadPoolExecutor executor) {
public void setExecutor(ThreadPoolExecutor executor) {
this.executor = executor;
}

View File

@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usage;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;
public class MemoryUsageConcurrencyTest {
@Test
public void testCycle() throws Exception {
Random r = new Random(0xb4a14);
for (int i = 0; i < 50000; i++) {
checkPercentage(i, i, r.nextInt(100) + 10, i % 2 == 0, i % 5 == 0);
}
}
private void checkPercentage(int attempt, int seed, int operations, boolean useArrayBlocking, boolean useWaitForSpaceThread) throws InterruptedException {
final BlockingQueue<Integer> toAdd;
final BlockingQueue<Integer> toRemove;
final BlockingQueue<Integer> removed;
if (useArrayBlocking) {
toAdd = new ArrayBlockingQueue<Integer>(operations);
toRemove = new ArrayBlockingQueue<Integer>(operations);
removed = new ArrayBlockingQueue<Integer>(operations);
} else {
toAdd = new LinkedBlockingQueue<Integer>();
toRemove = new LinkedBlockingQueue<Integer>();
removed = new LinkedBlockingQueue<Integer>();
}
final AtomicBoolean running = new AtomicBoolean(true);
final CountDownLatch startLatch = new CountDownLatch(1);
final MemoryUsage memUsage = new MemoryUsage();
memUsage.setLimit(1000);
memUsage.start();
Thread addThread = new Thread(new Runnable() {
@Override
public void run() {
try {
startLatch.await();
while (true) {
Integer add = toAdd.poll(1, TimeUnit.MILLISECONDS);
if (add == null) {
if (!running.get()) {
break;
}
} else {
// add to other queue before removing
toRemove.add(add);
memUsage.increaseUsage(add);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread removeThread = new Thread(new Runnable() {
@Override
public void run() {
try {
startLatch.await();
while (true) {
Integer remove = toRemove.poll(1, TimeUnit.MILLISECONDS);
if (remove == null) {
if (!running.get()) {
break;
}
} else {
memUsage.decreaseUsage(remove);
removed.add(remove);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread waitForSpaceThread = new Thread(new Runnable() {
@Override
public void run() {
try {
startLatch.await();
while (running.get()) {
memUsage.waitForSpace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
removeThread.start();
addThread.start();
if (useWaitForSpaceThread) {
waitForSpaceThread.start();
}
Random r = new Random(seed);
startLatch.countDown();
for (int i = 0; i < operations; i++) {
toAdd.add(r.nextInt(100) + 1);
}
// we expect the failure percentage to be related to the last operation
List<Integer> ops = new ArrayList<Integer>(operations);
for (int i = 0; i < operations; i++) {
Integer op = removed.poll(1000, TimeUnit.MILLISECONDS);
assertNotNull(op);
ops.add(op);
}
running.set(false);
if (useWaitForSpaceThread) {
try {
waitForSpaceThread.join(1000);
} catch (InterruptedException e) {
System.out.println("Attempt: " + attempt + " : " + memUsage + " waitForSpace never returned");
waitForSpaceThread.interrupt();
waitForSpaceThread.join();
}
}
removeThread.join();
addThread.join();
if (memUsage.getPercentUsage() != 0 || memUsage.getUsage() != memUsage.getPercentUsage()) {
System.out.println("Attempt: " + attempt + " : " + memUsage);
System.out.println("Operations: " + ops);
assertEquals(0, memUsage.getPercentUsage());
}
}
}