From 75968551bd2e3744af5b924f4aa18f0cbcf36a3e Mon Sep 17 00:00:00 2001 From: Colin McCabe Date: Sat, 19 Oct 2013 00:38:13 +0000 Subject: [PATCH] HDFS-5276. FileSystem.Statistics should use thread-local counters to avoid multi-threaded performance issues on read/write. (Colin Patrick McCabe) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1533670 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../java/org/apache/hadoop/fs/FileSystem.java | 303 ++++++++++++++++-- .../hadoop/fs/FCStatisticsBaseTest.java | 34 ++ 3 files changed, 312 insertions(+), 28 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index da670c647c2..4dd2bc9c7d4 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -76,6 +76,9 @@ Release 2.3.0 - UNRELEASED HADOOP-9078. enhance unit-test coverage of class org.apache.hadoop.fs.FileContext (Ivan A. Veselovsky via jeagles) + HDFS-5276. FileSystem.Statistics should use thread-local counters to avoid + multi-threaded performance issues on read/write. (Colin Patrick McCabe) + OPTIMIZATIONS HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 9f35c52c684..d44418b0ab9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -20,6 +20,7 @@ package org.apache.hadoop.fs; import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.ref.WeakReference; import java.net.URI; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; @@ -31,6 +32,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -2502,28 +2504,149 @@ public abstract class FileSystem extends Configured implements Closeable { } } + /** + * Tracks statistics about how many reads, writes, and so forth have been + * done in a FileSystem. + * + * Since there is only one of these objects per FileSystem, there will + * typically be many threads writing to this object. Almost every operation + * on an open file will involve a write to this object. In contrast, reading + * statistics is done infrequently by most programs, and not at all by others. + * Hence, this is optimized for writes. + * + * Each thread writes to its own thread-local area of memory. This removes + * contention and allows us to scale up to many, many threads. To read + * statistics, the reader thread totals up the contents of all of the + * thread-local data areas. + */ public static final class Statistics { + /** + * Statistics data. + * + * There is only a single writer to thread-local StatisticsData objects. + * Hence, volatile is adequate here-- we do not need AtomicLong or similar + * to prevent lost updates. + * The Java specification guarantees that updates to volatile longs will + * be perceived as atomic with respect to other threads, which is all we + * need. + */ + private static class StatisticsData { + volatile long bytesRead; + volatile long bytesWritten; + volatile int readOps; + volatile int largeReadOps; + volatile int writeOps; + /** + * Stores a weak reference to the thread owning this StatisticsData. + * This allows us to remove StatisticsData objects that pertain to + * threads that no longer exist. + */ + final WeakReference owner; + + StatisticsData(WeakReference owner) { + this.owner = owner; + } + + /** + * Add another StatisticsData object to this one. + */ + void add(StatisticsData other) { + this.bytesRead += other.bytesRead; + this.bytesWritten += other.bytesWritten; + this.readOps += other.readOps; + this.largeReadOps += other.largeReadOps; + this.writeOps += other.writeOps; + } + + /** + * Negate the values of all statistics. + */ + void negate() { + this.bytesRead = -this.bytesRead; + this.bytesWritten = -this.bytesWritten; + this.readOps = -this.readOps; + this.largeReadOps = -this.largeReadOps; + this.writeOps = -this.writeOps; + } + + @Override + public String toString() { + return bytesRead + " bytes read, " + bytesWritten + " bytes written, " + + readOps + " read ops, " + largeReadOps + " large read ops, " + + writeOps + " write ops"; + } + } + + private interface StatisticsAggregator { + void accept(StatisticsData data); + T aggregate(); + } + private final String scheme; - private AtomicLong bytesRead = new AtomicLong(); - private AtomicLong bytesWritten = new AtomicLong(); - private AtomicInteger readOps = new AtomicInteger(); - private AtomicInteger largeReadOps = new AtomicInteger(); - private AtomicInteger writeOps = new AtomicInteger(); + + /** + * rootData is data that doesn't belong to any thread, but will be added + * to the totals. This is useful for making copies of Statistics objects, + * and for storing data that pertains to threads that have been garbage + * collected. Protected by the Statistics lock. + */ + private final StatisticsData rootData; + + /** + * Thread-local data. + */ + private final ThreadLocal threadData; + /** + * List of all thread-local data areas. Protected by the Statistics lock. + */ + private LinkedList allData; + public Statistics(String scheme) { this.scheme = scheme; + this.rootData = new StatisticsData(null); + this.threadData = new ThreadLocal(); + this.allData = null; } /** * Copy constructor. * - * @param st - * The input Statistics object which is cloned. + * @param other The input Statistics object which is cloned. */ - public Statistics(Statistics st) { - this.scheme = st.scheme; - this.bytesRead = new AtomicLong(st.bytesRead.longValue()); - this.bytesWritten = new AtomicLong(st.bytesWritten.longValue()); + public Statistics(Statistics other) { + this.scheme = other.scheme; + this.rootData = new StatisticsData(null); + other.visitAll(new StatisticsAggregator() { + @Override + public void accept(StatisticsData data) { + rootData.add(data); + } + + public Void aggregate() { + return null; + } + }); + this.threadData = new ThreadLocal(); + } + + /** + * Get or create the thread-local data associated with the current thread. + */ + private StatisticsData getThreadData() { + StatisticsData data = threadData.get(); + if (data == null) { + data = new StatisticsData( + new WeakReference(Thread.currentThread())); + threadData.set(data); + synchronized(this) { + if (allData == null) { + allData = new LinkedList(); + } + allData.add(data); + } + } + return data; } /** @@ -2531,7 +2654,7 @@ public abstract class FileSystem extends Configured implements Closeable { * @param newBytes the additional bytes read */ public void incrementBytesRead(long newBytes) { - bytesRead.getAndAdd(newBytes); + getThreadData().bytesRead += newBytes; } /** @@ -2539,7 +2662,7 @@ public abstract class FileSystem extends Configured implements Closeable { * @param newBytes the additional bytes written */ public void incrementBytesWritten(long newBytes) { - bytesWritten.getAndAdd(newBytes); + getThreadData().bytesWritten += newBytes; } /** @@ -2547,7 +2670,7 @@ public abstract class FileSystem extends Configured implements Closeable { * @param count number of read operations */ public void incrementReadOps(int count) { - readOps.getAndAdd(count); + getThreadData().readOps += count; } /** @@ -2555,7 +2678,7 @@ public abstract class FileSystem extends Configured implements Closeable { * @param count number of large read operations */ public void incrementLargeReadOps(int count) { - largeReadOps.getAndAdd(count); + getThreadData().largeReadOps += count; } /** @@ -2563,7 +2686,38 @@ public abstract class FileSystem extends Configured implements Closeable { * @param count number of write operations */ public void incrementWriteOps(int count) { - writeOps.getAndAdd(count); + getThreadData().writeOps += count; + } + + /** + * Apply the given aggregator to all StatisticsData objects associated with + * this Statistics object. + * + * For each StatisticsData object, we will call accept on the visitor. + * Finally, at the end, we will call aggregate to get the final total. + * + * @param The visitor to use. + * @return The total. + */ + private synchronized T visitAll(StatisticsAggregator visitor) { + visitor.accept(rootData); + if (allData != null) { + for (Iterator iter = allData.iterator(); + iter.hasNext(); ) { + StatisticsData data = iter.next(); + visitor.accept(data); + if (data.owner.get() == null) { + /* + * If the thread that created this thread-local data no + * longer exists, remove the StatisticsData from our list + * and fold the values into rootData. + */ + rootData.add(data); + iter.remove(); + } + } + } + return visitor.aggregate(); } /** @@ -2571,7 +2725,18 @@ public abstract class FileSystem extends Configured implements Closeable { * @return the number of bytes */ public long getBytesRead() { - return bytesRead.get(); + return visitAll(new StatisticsAggregator() { + private long bytesRead = 0; + + @Override + public void accept(StatisticsData data) { + bytesRead += data.bytesRead; + } + + public Long aggregate() { + return bytesRead; + } + }); } /** @@ -2579,7 +2744,18 @@ public abstract class FileSystem extends Configured implements Closeable { * @return the number of bytes */ public long getBytesWritten() { - return bytesWritten.get(); + return visitAll(new StatisticsAggregator() { + private long bytesWritten = 0; + + @Override + public void accept(StatisticsData data) { + bytesWritten += data.bytesWritten; + } + + public Long aggregate() { + return bytesWritten; + } + }); } /** @@ -2587,7 +2763,19 @@ public abstract class FileSystem extends Configured implements Closeable { * @return number of read operations */ public int getReadOps() { - return readOps.get() + largeReadOps.get(); + return visitAll(new StatisticsAggregator() { + private int readOps = 0; + + @Override + public void accept(StatisticsData data) { + readOps += data.readOps; + readOps += data.largeReadOps; + } + + public Integer aggregate() { + return readOps; + } + }); } /** @@ -2596,7 +2784,18 @@ public abstract class FileSystem extends Configured implements Closeable { * @return number of large read operations */ public int getLargeReadOps() { - return largeReadOps.get(); + return visitAll(new StatisticsAggregator() { + private int largeReadOps = 0; + + @Override + public void accept(StatisticsData data) { + largeReadOps += data.largeReadOps; + } + + public Integer aggregate() { + return largeReadOps; + } + }); } /** @@ -2605,22 +2804,70 @@ public abstract class FileSystem extends Configured implements Closeable { * @return number of write operations */ public int getWriteOps() { - return writeOps.get(); + return visitAll(new StatisticsAggregator() { + private int writeOps = 0; + + @Override + public void accept(StatisticsData data) { + writeOps += data.writeOps; + } + + public Integer aggregate() { + return writeOps; + } + }); } + @Override public String toString() { - return bytesRead + " bytes read, " + bytesWritten + " bytes written, " - + readOps + " read ops, " + largeReadOps + " large read ops, " - + writeOps + " write ops"; + return visitAll(new StatisticsAggregator() { + private StatisticsData total = new StatisticsData(null); + + @Override + public void accept(StatisticsData data) { + total.add(data); + } + + public String aggregate() { + return total.toString(); + } + }); } - + /** - * Reset the counts of bytes to 0. + * Resets all statistics to 0. + * + * In order to reset, we add up all the thread-local statistics data, and + * set rootData to the negative of that. + * + * This may seem like a counterintuitive way to reset the statsitics. Why + * can't we just zero out all the thread-local data? Well, thread-local + * data can only be modified by the thread that owns it. If we tried to + * modify the thread-local data from this thread, our modification might get + * interleaved with a read-modify-write operation done by the thread that + * owns the data. That would result in our update getting lost. + * + * The approach used here avoids this problem because it only ever reads + * (not writes) the thread-local data. Both reads and writes to rootData + * are done under the lock, so we're free to modify rootData from any thread + * that holds the lock. */ public void reset() { - bytesWritten.set(0); - bytesRead.set(0); + visitAll(new StatisticsAggregator() { + private StatisticsData total = new StatisticsData(null); + + @Override + public void accept(StatisticsData data) { + total.add(data); + } + + public Void aggregate() { + total.negate(); + rootData.add(total); + return null; + } + }); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java index 2718291f6c2..cc80e7ced87 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java @@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileSystem.Statistics; import org.junit.Assert; import org.junit.Test; +import com.google.common.util.concurrent.Uninterruptibles; + import static org.apache.hadoop.fs.FileContextTestHelper.*; /** @@ -44,6 +46,38 @@ public abstract class FCStatisticsBaseTest { //fc should be set appropriately by the deriving test. protected static FileContext fc = null; + @Test(timeout=60000) + public void testStatisticsOperations() throws Exception { + final Statistics stats = new Statistics("file"); + Assert.assertEquals(0L, stats.getBytesRead()); + Assert.assertEquals(0L, stats.getBytesWritten()); + Assert.assertEquals(0, stats.getWriteOps()); + stats.incrementBytesWritten(1000); + Assert.assertEquals(1000L, stats.getBytesWritten()); + Assert.assertEquals(0, stats.getWriteOps()); + stats.incrementWriteOps(123); + Assert.assertEquals(123, stats.getWriteOps()); + + Thread thread = new Thread() { + @Override + public void run() { + stats.incrementWriteOps(1); + } + }; + thread.start(); + Uninterruptibles.joinUninterruptibly(thread); + Assert.assertEquals(124, stats.getWriteOps()); + // Test copy constructor and reset function + Statistics stats2 = new Statistics(stats); + stats.reset(); + Assert.assertEquals(0, stats.getWriteOps()); + Assert.assertEquals(0L, stats.getBytesWritten()); + Assert.assertEquals(0L, stats.getBytesRead()); + Assert.assertEquals(124, stats2.getWriteOps()); + Assert.assertEquals(1000L, stats2.getBytesWritten()); + Assert.assertEquals(0L, stats2.getBytesRead()); + } + @Test public void testStatistics() throws IOException, URISyntaxException { URI fsUri = getFsUri();