HADOOP-12107. long running apps may have a huge number of StatisticsData instances under FileSystem (Sangjin Lee via Ming Ma)

(cherry picked from commit 8e1bdc17d9)

Conflicts:
	hadoop-common-project/hadoop-common/CHANGES.txt
This commit is contained in:
Ming Ma 2015-06-29 14:37:38 -07:00
parent c1447e654d
commit 5e6bbe6031
3 changed files with 155 additions and 44 deletions

View File

@ -539,6 +539,9 @@ Release 2.7.1 - 2015-07-06
that method doesn't modify the FsPermission (Bibin A Chundatt via Colin P.
McCabe)
HADOOP-12107. long running apps may have a huge number of StatisticsData
instances under FileSystem (Sangjin Lee via Ming Ma)
Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES

View File

@ -20,7 +20,8 @@ package org.apache.hadoop.fs;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
@ -32,7 +33,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@ -2918,16 +2918,6 @@ public abstract class FileSystem extends Configured implements Closeable {
volatile int readOps;
volatile int largeReadOps;
volatile int writeOps;
/**
* Stores a weak reference to the thread owning this StatisticsData.
* This allows us to remove StatisticsData objects that pertain to
* threads that no longer exist.
*/
final WeakReference<Thread> owner;
StatisticsData(WeakReference<Thread> owner) {
this.owner = owner;
}
/**
* Add another StatisticsData object to this one.
@ -2998,17 +2988,37 @@ public abstract class FileSystem extends Configured implements Closeable {
* Thread-local data.
*/
private final ThreadLocal<StatisticsData> threadData;
/**
* List of all thread-local data areas. Protected by the Statistics lock.
* Set of all thread-local data areas. Protected by the Statistics lock.
* The references to the statistics data are kept using phantom references
* to the associated threads. Proper clean-up is performed by the cleaner
* thread when the threads are garbage collected.
*/
private LinkedList<StatisticsData> allData;
private final Set<StatisticsDataReference> allData;
/**
* Global reference queue and a cleaner thread that manage statistics data
* references from all filesystem instances.
*/
private static final ReferenceQueue<Thread> STATS_DATA_REF_QUEUE;
private static final Thread STATS_DATA_CLEANER;
static {
STATS_DATA_REF_QUEUE = new ReferenceQueue<Thread>();
// start a single daemon cleaner thread
STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
STATS_DATA_CLEANER.
setName(StatisticsDataReferenceCleaner.class.getName());
STATS_DATA_CLEANER.setDaemon(true);
STATS_DATA_CLEANER.start();
}
public Statistics(String scheme) {
this.scheme = scheme;
this.rootData = new StatisticsData(null);
this.rootData = new StatisticsData();
this.threadData = new ThreadLocal<StatisticsData>();
this.allData = null;
this.allData = new HashSet<StatisticsDataReference>();
}
/**
@ -3018,7 +3028,7 @@ public abstract class FileSystem extends Configured implements Closeable {
*/
public Statistics(Statistics other) {
this.scheme = other.scheme;
this.rootData = new StatisticsData(null);
this.rootData = new StatisticsData();
other.visitAll(new StatisticsAggregator<Void>() {
@Override
public void accept(StatisticsData data) {
@ -3030,6 +3040,63 @@ public abstract class FileSystem extends Configured implements Closeable {
}
});
this.threadData = new ThreadLocal<StatisticsData>();
this.allData = new HashSet<StatisticsDataReference>();
}
/**
* A phantom reference to a thread that also includes the data associated
* with that thread. On the thread being garbage collected, it is enqueued
* to the reference queue for clean-up.
*/
private class StatisticsDataReference extends PhantomReference<Thread> {
private final StatisticsData data;
public StatisticsDataReference(StatisticsData data, Thread thread) {
super(thread, STATS_DATA_REF_QUEUE);
this.data = data;
}
public StatisticsData getData() {
return data;
}
/**
* Performs clean-up action when the associated thread is garbage
* collected.
*/
public void cleanUp() {
// use the statistics lock for safety
synchronized (Statistics.this) {
/*
* If the thread that created this thread-local data no longer exists,
* remove the StatisticsData from our list and fold the values into
* rootData.
*/
rootData.add(data);
allData.remove(this);
}
}
}
/**
* Background action to act on references being removed.
*/
private static class StatisticsDataReferenceCleaner implements Runnable {
@Override
public void run() {
while (true) {
try {
StatisticsDataReference ref =
(StatisticsDataReference)STATS_DATA_REF_QUEUE.remove();
ref.cleanUp();
} catch (Throwable th) {
// the cleaner thread should continue to run even if there are
// exceptions, including InterruptedException
LOG.warn("exception in the cleaner thread but it will continue to "
+ "run", th);
}
}
}
}
/**
@ -3038,14 +3105,12 @@ public abstract class FileSystem extends Configured implements Closeable {
public StatisticsData getThreadStatistics() {
StatisticsData data = threadData.get();
if (data == null) {
data = new StatisticsData(
new WeakReference<Thread>(Thread.currentThread()));
data = new StatisticsData();
threadData.set(data);
StatisticsDataReference ref =
new StatisticsDataReference(data, Thread.currentThread());
synchronized(this) {
if (allData == null) {
allData = new LinkedList<StatisticsData>();
}
allData.add(data);
allData.add(ref);
}
}
return data;
@ -3103,21 +3168,9 @@ public abstract class FileSystem extends Configured implements Closeable {
*/
private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
visitor.accept(rootData);
if (allData != null) {
for (Iterator<StatisticsData> iter = allData.iterator();
iter.hasNext(); ) {
StatisticsData data = iter.next();
visitor.accept(data);
if (data.owner.get() == null) {
/*
* If the thread that created this thread-local data no
* longer exists, remove the StatisticsData from our list
* and fold the values into rootData.
*/
rootData.add(data);
iter.remove();
}
}
for (StatisticsDataReference ref: allData) {
StatisticsData data = ref.getData();
visitor.accept(data);
}
return visitor.aggregate();
}
@ -3224,7 +3277,7 @@ public abstract class FileSystem extends Configured implements Closeable {
@Override
public String toString() {
return visitAll(new StatisticsAggregator<String>() {
private StatisticsData total = new StatisticsData(null);
private StatisticsData total = new StatisticsData();
@Override
public void accept(StatisticsData data) {
@ -3257,7 +3310,7 @@ public abstract class FileSystem extends Configured implements Closeable {
*/
public void reset() {
visitAll(new StatisticsAggregator<Void>() {
private StatisticsData total = new StatisticsData(null);
private StatisticsData total = new StatisticsData();
@Override
public void accept(StatisticsData data) {
@ -3279,6 +3332,11 @@ public abstract class FileSystem extends Configured implements Closeable {
public String getScheme() {
return scheme;
}
@VisibleForTesting
synchronized int getAllThreadLocalDataSize() {
return allData.size();
}
}
/**

View File

@ -18,26 +18,34 @@
package org.apache.hadoop.fs;
import static org.apache.hadoop.fs.FileContextTestHelper.createFile;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.Uninterruptibles;
import static org.apache.hadoop.fs.FileContextTestHelper.*;
/**
* <p>
* Base class to test {@link FileContext} Statistics.
* </p>
*/
public abstract class FCStatisticsBaseTest {
static protected int blockSize = 512;
static protected int numBlocks = 1;
@ -102,6 +110,48 @@ public abstract class FCStatisticsBaseTest {
fc.delete(filePath, true);
}
@Test(timeout=60000)
public void testStatisticsThreadLocalDataCleanUp() throws Exception {
final Statistics stats = new Statistics("test");
// create a small thread pool to test the statistics
final int size = 2;
ExecutorService es = Executors.newFixedThreadPool(size);
List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>(size);
for (int i = 0; i < size; i++) {
tasks.add(new Callable<Boolean>() {
public Boolean call() {
// this populates the data set in statistics
stats.incrementReadOps(1);
return true;
}
});
}
// run the threads
es.invokeAll(tasks);
// assert that the data size is exactly the number of threads
final AtomicInteger allDataSize = new AtomicInteger(0);
allDataSize.set(stats.getAllThreadLocalDataSize());
Assert.assertEquals(size, allDataSize.get());
Assert.assertEquals(size, stats.getReadOps());
// force the GC to collect the threads by shutting down the thread pool
es.shutdownNow();
es.awaitTermination(1, TimeUnit.MINUTES);
es = null;
System.gc();
// wait for up to 10 seconds
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
int size = stats.getAllThreadLocalDataSize();
allDataSize.set(size);
return size == 0;
}
}, 1000, 10*1000);
Assert.assertEquals(0, allDataSize.get());
Assert.assertEquals(size, stats.getReadOps());
}
/**
* Bytes read may be different for different file systems. This method should
* throw assertion error if bytes read are incorrect.