HDFS-5276. FileSystem.Statistics should use thread-local counters to avoid multi-threaded performance issues on read/write. (Colin Patrick McCabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1533668 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2baa42dd01
commit
e86f4a2e25
|
@ -363,6 +363,9 @@ Release 2.3.0 - UNRELEASED
|
||||||
HADOOP-9078. enhance unit-test coverage of class
|
HADOOP-9078. enhance unit-test coverage of class
|
||||||
org.apache.hadoop.fs.FileContext (Ivan A. Veselovsky via jeagles)
|
org.apache.hadoop.fs.FileContext (Ivan A. Veselovsky via jeagles)
|
||||||
|
|
||||||
|
HDFS-5276. FileSystem.Statistics should use thread-local counters to avoid
|
||||||
|
multi-threaded performance issues on read/write. (Colin Patrick McCabe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
|
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.fs;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.ref.WeakReference;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
@ -31,6 +32,7 @@ import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.IdentityHashMap;
|
import java.util.IdentityHashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.NoSuchElementException;
|
import java.util.NoSuchElementException;
|
||||||
|
@ -2501,28 +2503,149 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tracks statistics about how many reads, writes, and so forth have been
|
||||||
|
* done in a FileSystem.
|
||||||
|
*
|
||||||
|
* Since there is only one of these objects per FileSystem, there will
|
||||||
|
* typically be many threads writing to this object. Almost every operation
|
||||||
|
* on an open file will involve a write to this object. In contrast, reading
|
||||||
|
* statistics is done infrequently by most programs, and not at all by others.
|
||||||
|
* Hence, this is optimized for writes.
|
||||||
|
*
|
||||||
|
* Each thread writes to its own thread-local area of memory. This removes
|
||||||
|
* contention and allows us to scale up to many, many threads. To read
|
||||||
|
* statistics, the reader thread totals up the contents of all of the
|
||||||
|
* thread-local data areas.
|
||||||
|
*/
|
||||||
public static final class Statistics {
|
public static final class Statistics {
|
||||||
|
/**
|
||||||
|
* Statistics data.
|
||||||
|
*
|
||||||
|
* There is only a single writer to thread-local StatisticsData objects.
|
||||||
|
* Hence, volatile is adequate here-- we do not need AtomicLong or similar
|
||||||
|
* to prevent lost updates.
|
||||||
|
* The Java specification guarantees that updates to volatile longs will
|
||||||
|
* be perceived as atomic with respect to other threads, which is all we
|
||||||
|
* need.
|
||||||
|
*/
|
||||||
|
private static class StatisticsData {
|
||||||
|
volatile long bytesRead;
|
||||||
|
volatile long bytesWritten;
|
||||||
|
volatile int readOps;
|
||||||
|
volatile int largeReadOps;
|
||||||
|
volatile int writeOps;
|
||||||
|
/**
|
||||||
|
* Stores a weak reference to the thread owning this StatisticsData.
|
||||||
|
* This allows us to remove StatisticsData objects that pertain to
|
||||||
|
* threads that no longer exist.
|
||||||
|
*/
|
||||||
|
final WeakReference<Thread> owner;
|
||||||
|
|
||||||
|
StatisticsData(WeakReference<Thread> owner) {
|
||||||
|
this.owner = owner;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add another StatisticsData object to this one.
|
||||||
|
*/
|
||||||
|
void add(StatisticsData other) {
|
||||||
|
this.bytesRead += other.bytesRead;
|
||||||
|
this.bytesWritten += other.bytesWritten;
|
||||||
|
this.readOps += other.readOps;
|
||||||
|
this.largeReadOps += other.largeReadOps;
|
||||||
|
this.writeOps += other.writeOps;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Negate the values of all statistics.
|
||||||
|
*/
|
||||||
|
void negate() {
|
||||||
|
this.bytesRead = -this.bytesRead;
|
||||||
|
this.bytesWritten = -this.bytesWritten;
|
||||||
|
this.readOps = -this.readOps;
|
||||||
|
this.largeReadOps = -this.largeReadOps;
|
||||||
|
this.writeOps = -this.writeOps;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
|
||||||
|
+ readOps + " read ops, " + largeReadOps + " large read ops, "
|
||||||
|
+ writeOps + " write ops";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private interface StatisticsAggregator<T> {
|
||||||
|
void accept(StatisticsData data);
|
||||||
|
T aggregate();
|
||||||
|
}
|
||||||
|
|
||||||
private final String scheme;
|
private final String scheme;
|
||||||
private AtomicLong bytesRead = new AtomicLong();
|
|
||||||
private AtomicLong bytesWritten = new AtomicLong();
|
/**
|
||||||
private AtomicInteger readOps = new AtomicInteger();
|
* rootData is data that doesn't belong to any thread, but will be added
|
||||||
private AtomicInteger largeReadOps = new AtomicInteger();
|
* to the totals. This is useful for making copies of Statistics objects,
|
||||||
private AtomicInteger writeOps = new AtomicInteger();
|
* and for storing data that pertains to threads that have been garbage
|
||||||
|
* collected. Protected by the Statistics lock.
|
||||||
|
*/
|
||||||
|
private final StatisticsData rootData;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thread-local data.
|
||||||
|
*/
|
||||||
|
private final ThreadLocal<StatisticsData> threadData;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List of all thread-local data areas. Protected by the Statistics lock.
|
||||||
|
*/
|
||||||
|
private LinkedList<StatisticsData> allData;
|
||||||
|
|
||||||
public Statistics(String scheme) {
|
public Statistics(String scheme) {
|
||||||
this.scheme = scheme;
|
this.scheme = scheme;
|
||||||
|
this.rootData = new StatisticsData(null);
|
||||||
|
this.threadData = new ThreadLocal<StatisticsData>();
|
||||||
|
this.allData = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Copy constructor.
|
* Copy constructor.
|
||||||
*
|
*
|
||||||
* @param st
|
* @param other The input Statistics object which is cloned.
|
||||||
* The input Statistics object which is cloned.
|
|
||||||
*/
|
*/
|
||||||
public Statistics(Statistics st) {
|
public Statistics(Statistics other) {
|
||||||
this.scheme = st.scheme;
|
this.scheme = other.scheme;
|
||||||
this.bytesRead = new AtomicLong(st.bytesRead.longValue());
|
this.rootData = new StatisticsData(null);
|
||||||
this.bytesWritten = new AtomicLong(st.bytesWritten.longValue());
|
other.visitAll(new StatisticsAggregator<Void>() {
|
||||||
|
@Override
|
||||||
|
public void accept(StatisticsData data) {
|
||||||
|
rootData.add(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Void aggregate() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
this.threadData = new ThreadLocal<StatisticsData>();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get or create the thread-local data associated with the current thread.
|
||||||
|
*/
|
||||||
|
private StatisticsData getThreadData() {
|
||||||
|
StatisticsData data = threadData.get();
|
||||||
|
if (data == null) {
|
||||||
|
data = new StatisticsData(
|
||||||
|
new WeakReference<Thread>(Thread.currentThread()));
|
||||||
|
threadData.set(data);
|
||||||
|
synchronized(this) {
|
||||||
|
if (allData == null) {
|
||||||
|
allData = new LinkedList<StatisticsData>();
|
||||||
|
}
|
||||||
|
allData.add(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2530,7 +2653,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
* @param newBytes the additional bytes read
|
* @param newBytes the additional bytes read
|
||||||
*/
|
*/
|
||||||
public void incrementBytesRead(long newBytes) {
|
public void incrementBytesRead(long newBytes) {
|
||||||
bytesRead.getAndAdd(newBytes);
|
getThreadData().bytesRead += newBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2538,7 +2661,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
* @param newBytes the additional bytes written
|
* @param newBytes the additional bytes written
|
||||||
*/
|
*/
|
||||||
public void incrementBytesWritten(long newBytes) {
|
public void incrementBytesWritten(long newBytes) {
|
||||||
bytesWritten.getAndAdd(newBytes);
|
getThreadData().bytesWritten += newBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2546,7 +2669,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
* @param count number of read operations
|
* @param count number of read operations
|
||||||
*/
|
*/
|
||||||
public void incrementReadOps(int count) {
|
public void incrementReadOps(int count) {
|
||||||
readOps.getAndAdd(count);
|
getThreadData().readOps += count;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2554,7 +2677,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
* @param count number of large read operations
|
* @param count number of large read operations
|
||||||
*/
|
*/
|
||||||
public void incrementLargeReadOps(int count) {
|
public void incrementLargeReadOps(int count) {
|
||||||
largeReadOps.getAndAdd(count);
|
getThreadData().largeReadOps += count;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2562,7 +2685,38 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
* @param count number of write operations
|
* @param count number of write operations
|
||||||
*/
|
*/
|
||||||
public void incrementWriteOps(int count) {
|
public void incrementWriteOps(int count) {
|
||||||
writeOps.getAndAdd(count);
|
getThreadData().writeOps += count;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Apply the given aggregator to all StatisticsData objects associated with
|
||||||
|
* this Statistics object.
|
||||||
|
*
|
||||||
|
* For each StatisticsData object, we will call accept on the visitor.
|
||||||
|
* Finally, at the end, we will call aggregate to get the final total.
|
||||||
|
*
|
||||||
|
* @param The visitor to use.
|
||||||
|
* @return The total.
|
||||||
|
*/
|
||||||
|
private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
|
||||||
|
visitor.accept(rootData);
|
||||||
|
if (allData != null) {
|
||||||
|
for (Iterator<StatisticsData> iter = allData.iterator();
|
||||||
|
iter.hasNext(); ) {
|
||||||
|
StatisticsData data = iter.next();
|
||||||
|
visitor.accept(data);
|
||||||
|
if (data.owner.get() == null) {
|
||||||
|
/*
|
||||||
|
* If the thread that created this thread-local data no
|
||||||
|
* longer exists, remove the StatisticsData from our list
|
||||||
|
* and fold the values into rootData.
|
||||||
|
*/
|
||||||
|
rootData.add(data);
|
||||||
|
iter.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return visitor.aggregate();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2570,7 +2724,18 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
* @return the number of bytes
|
* @return the number of bytes
|
||||||
*/
|
*/
|
||||||
public long getBytesRead() {
|
public long getBytesRead() {
|
||||||
return bytesRead.get();
|
return visitAll(new StatisticsAggregator<Long>() {
|
||||||
|
private long bytesRead = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void accept(StatisticsData data) {
|
||||||
|
bytesRead += data.bytesRead;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long aggregate() {
|
||||||
|
return bytesRead;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2578,7 +2743,18 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
* @return the number of bytes
|
* @return the number of bytes
|
||||||
*/
|
*/
|
||||||
public long getBytesWritten() {
|
public long getBytesWritten() {
|
||||||
return bytesWritten.get();
|
return visitAll(new StatisticsAggregator<Long>() {
|
||||||
|
private long bytesWritten = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void accept(StatisticsData data) {
|
||||||
|
bytesWritten += data.bytesWritten;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long aggregate() {
|
||||||
|
return bytesWritten;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2586,7 +2762,19 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
* @return number of read operations
|
* @return number of read operations
|
||||||
*/
|
*/
|
||||||
public int getReadOps() {
|
public int getReadOps() {
|
||||||
return readOps.get() + largeReadOps.get();
|
return visitAll(new StatisticsAggregator<Integer>() {
|
||||||
|
private int readOps = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void accept(StatisticsData data) {
|
||||||
|
readOps += data.readOps;
|
||||||
|
readOps += data.largeReadOps;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer aggregate() {
|
||||||
|
return readOps;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2595,7 +2783,18 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
* @return number of large read operations
|
* @return number of large read operations
|
||||||
*/
|
*/
|
||||||
public int getLargeReadOps() {
|
public int getLargeReadOps() {
|
||||||
return largeReadOps.get();
|
return visitAll(new StatisticsAggregator<Integer>() {
|
||||||
|
private int largeReadOps = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void accept(StatisticsData data) {
|
||||||
|
largeReadOps += data.largeReadOps;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer aggregate() {
|
||||||
|
return largeReadOps;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2604,22 +2803,70 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
* @return number of write operations
|
* @return number of write operations
|
||||||
*/
|
*/
|
||||||
public int getWriteOps() {
|
public int getWriteOps() {
|
||||||
return writeOps.get();
|
return visitAll(new StatisticsAggregator<Integer>() {
|
||||||
|
private int writeOps = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void accept(StatisticsData data) {
|
||||||
|
writeOps += data.writeOps;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer aggregate() {
|
||||||
|
return writeOps;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
|
return visitAll(new StatisticsAggregator<String>() {
|
||||||
+ readOps + " read ops, " + largeReadOps + " large read ops, "
|
private StatisticsData total = new StatisticsData(null);
|
||||||
+ writeOps + " write ops";
|
|
||||||
|
@Override
|
||||||
|
public void accept(StatisticsData data) {
|
||||||
|
total.add(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String aggregate() {
|
||||||
|
return total.toString();
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reset the counts of bytes to 0.
|
* Resets all statistics to 0.
|
||||||
|
*
|
||||||
|
* In order to reset, we add up all the thread-local statistics data, and
|
||||||
|
* set rootData to the negative of that.
|
||||||
|
*
|
||||||
|
* This may seem like a counterintuitive way to reset the statsitics. Why
|
||||||
|
* can't we just zero out all the thread-local data? Well, thread-local
|
||||||
|
* data can only be modified by the thread that owns it. If we tried to
|
||||||
|
* modify the thread-local data from this thread, our modification might get
|
||||||
|
* interleaved with a read-modify-write operation done by the thread that
|
||||||
|
* owns the data. That would result in our update getting lost.
|
||||||
|
*
|
||||||
|
* The approach used here avoids this problem because it only ever reads
|
||||||
|
* (not writes) the thread-local data. Both reads and writes to rootData
|
||||||
|
* are done under the lock, so we're free to modify rootData from any thread
|
||||||
|
* that holds the lock.
|
||||||
*/
|
*/
|
||||||
public void reset() {
|
public void reset() {
|
||||||
bytesWritten.set(0);
|
visitAll(new StatisticsAggregator<Void>() {
|
||||||
bytesRead.set(0);
|
private StatisticsData total = new StatisticsData(null);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void accept(StatisticsData data) {
|
||||||
|
total.add(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Void aggregate() {
|
||||||
|
total.negate();
|
||||||
|
rootData.add(total);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileSystem.Statistics;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.Uninterruptibles;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.FileContextTestHelper.*;
|
import static org.apache.hadoop.fs.FileContextTestHelper.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -44,6 +46,38 @@ public abstract class FCStatisticsBaseTest {
|
||||||
//fc should be set appropriately by the deriving test.
|
//fc should be set appropriately by the deriving test.
|
||||||
protected static FileContext fc = null;
|
protected static FileContext fc = null;
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testStatisticsOperations() throws Exception {
|
||||||
|
final Statistics stats = new Statistics("file");
|
||||||
|
Assert.assertEquals(0L, stats.getBytesRead());
|
||||||
|
Assert.assertEquals(0L, stats.getBytesWritten());
|
||||||
|
Assert.assertEquals(0, stats.getWriteOps());
|
||||||
|
stats.incrementBytesWritten(1000);
|
||||||
|
Assert.assertEquals(1000L, stats.getBytesWritten());
|
||||||
|
Assert.assertEquals(0, stats.getWriteOps());
|
||||||
|
stats.incrementWriteOps(123);
|
||||||
|
Assert.assertEquals(123, stats.getWriteOps());
|
||||||
|
|
||||||
|
Thread thread = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
stats.incrementWriteOps(1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
thread.start();
|
||||||
|
Uninterruptibles.joinUninterruptibly(thread);
|
||||||
|
Assert.assertEquals(124, stats.getWriteOps());
|
||||||
|
// Test copy constructor and reset function
|
||||||
|
Statistics stats2 = new Statistics(stats);
|
||||||
|
stats.reset();
|
||||||
|
Assert.assertEquals(0, stats.getWriteOps());
|
||||||
|
Assert.assertEquals(0L, stats.getBytesWritten());
|
||||||
|
Assert.assertEquals(0L, stats.getBytesRead());
|
||||||
|
Assert.assertEquals(124, stats2.getWriteOps());
|
||||||
|
Assert.assertEquals(1000L, stats2.getBytesWritten());
|
||||||
|
Assert.assertEquals(0L, stats2.getBytesRead());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStatistics() throws IOException, URISyntaxException {
|
public void testStatistics() throws IOException, URISyntaxException {
|
||||||
URI fsUri = getFsUri();
|
URI fsUri = getFsUri();
|
||||||
|
|
Loading…
Reference in New Issue