From f45365f2016b38fdef0cfdee00aea04e3cb6873b Mon Sep 17 00:00:00 2001 From: litao Date: Tue, 27 Apr 2021 21:39:25 +0800 Subject: [PATCH] HDFS-15975. Use LongAdder instead of AtomicLong for branch-3.3 (#2940) Signed-off-by: Takanobu Asanuma --- .../metrics2/lib/MutableCounterLong.java | 10 +++---- .../hadoop/hdfs/DFSHedgedReadMetrics.java | 14 +++++----- .../hadoop/hdfs/DFSOpsCountStatistics.java | 20 ++++++------- .../fsdataset/impl/FsDatasetCache.java | 28 +++++++++---------- .../fsdataset/impl/FsDatasetImpl.java | 2 +- .../hdfs/server/namenode/FSEditLog.java | 10 +++---- .../server/namenode/FSNamesystemLock.java | 13 +++++---- .../org/apache/hadoop/hdfs/TestPread.java | 8 +++--- 8 files changed, 53 insertions(+), 52 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java index d3dec2e4d06..efaf8a14eaf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java @@ -23,7 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsRecordBuilder; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; /** * A mutable long counter @@ -32,11 +32,11 @@ import java.util.concurrent.atomic.AtomicLong; @InterfaceStability.Evolving public class MutableCounterLong extends MutableCounter { - private AtomicLong value = new AtomicLong(); + private final LongAdder value = new LongAdder(); public MutableCounterLong(MetricsInfo info, long initValue) { super(info); - this.value.set(initValue); + this.value.add(initValue); } @Override @@ -49,12 +49,12 @@ public class MutableCounterLong extends MutableCounter { * @param delta of the increment */ public void incr(long delta) { - value.addAndGet(delta); + value.add(delta); setChanged(); } public long value() { - return value.get(); + return value.longValue(); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java index 2a228e8d018..1cd9e82cebb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs; import org.apache.hadoop.classification.InterfaceAudience; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; /** * The client-side metrics for hedged read feature. @@ -28,20 +28,20 @@ import java.util.concurrent.atomic.AtomicLong; */ @InterfaceAudience.Private public class DFSHedgedReadMetrics { - public final AtomicLong hedgedReadOps = new AtomicLong(); - public final AtomicLong hedgedReadOpsWin = new AtomicLong(); - public final AtomicLong hedgedReadOpsInCurThread = new AtomicLong(); + public final LongAdder hedgedReadOps = new LongAdder(); + public final LongAdder hedgedReadOpsWin = new LongAdder(); + public final LongAdder hedgedReadOpsInCurThread = new LongAdder(); public void incHedgedReadOps() { - hedgedReadOps.incrementAndGet(); + hedgedReadOps.increment(); } public void incHedgedReadOpsInCurThread() { - hedgedReadOpsInCurThread.incrementAndGet(); + hedgedReadOpsInCurThread.increment(); } public void incHedgedReadWins() { - hedgedReadOpsWin.incrementAndGet(); + hedgedReadOpsWin.increment(); } public long getHedgedReadOps() { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java index 2113ae5c635..d8f73bcec14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java @@ -26,7 +26,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; /** * This storage statistics tracks how many times each DFS operation was issued. @@ -140,21 +140,21 @@ public class DFSOpsCountStatistics extends StorageStatistics { public static final String NAME = "DFSOpsCountStatistics"; - private final Map opsCount = new EnumMap<>(OpType.class); + private final Map opsCount = new EnumMap<>(OpType.class); public DFSOpsCountStatistics() { super(NAME); for (OpType opType : OpType.values()) { - opsCount.put(opType, new AtomicLong(0)); + opsCount.put(opType, new LongAdder()); } } public void incrementOpCounter(OpType op) { - opsCount.get(op).addAndGet(1); + opsCount.get(op).increment(); } private class LongIterator implements Iterator { - private Iterator> iterator = + private final Iterator> iterator = opsCount.entrySet().iterator(); @Override @@ -167,9 +167,9 @@ public class DFSOpsCountStatistics extends StorageStatistics { if (!iterator.hasNext()) { throw new NoSuchElementException(); } - final Entry entry = iterator.next(); + final Entry entry = iterator.next(); return new LongStatistic(entry.getKey().getSymbol(), - entry.getValue().get()); + entry.getValue().longValue()); } @Override @@ -191,7 +191,7 @@ public class DFSOpsCountStatistics extends StorageStatistics { @Override public Long getLong(String key) { final OpType type = OpType.fromSymbol(key); - return type == null ? null : opsCount.get(type).get(); + return type == null ? null : opsCount.get(type).longValue(); } @Override @@ -201,8 +201,8 @@ public class DFSOpsCountStatistics extends StorageStatistics { @Override public void reset() { - for (AtomicLong count : opsCount.values()) { - count.set(0); + for (LongAdder count : opsCount.values()) { + count.reset(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index b6a57fdeffa..facace28604 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -42,7 +42,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.time.DurationFormatUtils; @@ -120,7 +120,7 @@ public class FsDatasetCache { private final HashMap mappableBlockMap = new HashMap(); - private final AtomicLong numBlocksCached = new AtomicLong(0); + private final LongAdder numBlocksCached = new LongAdder(); private final FsDatasetImpl dataset; @@ -143,11 +143,11 @@ public class FsDatasetCache { /** * Number of cache commands that could not be completed successfully */ - final AtomicLong numBlocksFailedToCache = new AtomicLong(0); + final LongAdder numBlocksFailedToCache = new LongAdder(); /** * Number of uncache commands that could not be completed successfully */ - final AtomicLong numBlocksFailedToUncache = new AtomicLong(0); + final LongAdder numBlocksFailedToUncache = new LongAdder(); public FsDatasetCache(FsDatasetImpl dataset) throws IOException { this.dataset = dataset; @@ -204,7 +204,7 @@ public class FsDatasetCache { for (Map.Entry entry : entrySet) { mappableBlockMap.put(entry.getKey(), new Value(keyToMappableBlock.get(entry.getKey()), State.CACHED)); - numBlocksCached.addAndGet(1); + numBlocksCached.increment(); dataset.datanode.getMetrics().incrBlocksCached(1); } } @@ -278,7 +278,7 @@ public class FsDatasetCache { LOG.debug("Block with id {}, pool {} already exists in the " + "FsDatasetCache with state {}", blockId, bpid, prevValue.state ); - numBlocksFailedToCache.incrementAndGet(); + numBlocksFailedToCache.increment(); return; } mappableBlockMap.put(key, new Value(null, State.CACHING)); @@ -301,7 +301,7 @@ public class FsDatasetCache { LOG.debug("Block with id {}, pool {} does not need to be uncached, " + "because it is not currently in the mappableBlockMap.", blockId, bpid); - numBlocksFailedToUncache.incrementAndGet(); + numBlocksFailedToUncache.increment(); return; } switch (prevValue.state) { @@ -331,7 +331,7 @@ public class FsDatasetCache { default: LOG.debug("Block with id {}, pool {} does not need to be uncached, " + "because it is in state {}.", blockId, bpid, prevValue.state); - numBlocksFailedToUncache.incrementAndGet(); + numBlocksFailedToUncache.increment(); break; } } @@ -469,7 +469,7 @@ public class FsDatasetCache { dataset.datanode. getShortCircuitRegistry().processBlockMlockEvent(key); } - numBlocksCached.addAndGet(1); + numBlocksCached.increment(); dataset.datanode.getMetrics().incrBlocksCached(1); success = true; } finally { @@ -482,7 +482,7 @@ public class FsDatasetCache { LOG.debug("Caching of {} was aborted. We are now caching only {} " + "bytes in total.", key, cacheLoader.getCacheUsed()); IOUtils.closeQuietly(mappableBlock); - numBlocksFailedToCache.incrementAndGet(); + numBlocksFailedToCache.increment(); synchronized (FsDatasetCache.this) { mappableBlockMap.remove(key); @@ -561,7 +561,7 @@ public class FsDatasetCache { } long newUsedBytes = cacheLoader. release(key, value.mappableBlock.getLength()); - numBlocksCached.addAndGet(-1); + numBlocksCached.decrement(); dataset.datanode.getMetrics().incrBlocksUncached(1); if (revocationTimeMs != 0) { LOG.debug("Uncaching of {} completed. usedBytes = {}", @@ -607,15 +607,15 @@ public class FsDatasetCache { } public long getNumBlocksFailedToCache() { - return numBlocksFailedToCache.get(); + return numBlocksFailedToCache.longValue(); } public long getNumBlocksFailedToUncache() { - return numBlocksFailedToUncache.get(); + return numBlocksFailedToUncache.longValue(); } public long getNumBlocksCached() { - return numBlocksCached.get(); + return numBlocksCached.longValue(); } public synchronized boolean isCached(String bpid, long blockId) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 92b1dac2f8d..bcf337a20a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -2264,7 +2264,7 @@ class FsDatasetImpl implements FsDatasetSpi { success = true; } finally { if (!success) { - cacheManager.numBlocksFailedToCache.incrementAndGet(); + cacheManager.numBlocksFailedToCache.increment(); } } blockFileName = info.getBlockURI().toString(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index 2ef3a028acd..0a9731580ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -27,7 +27,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -182,7 +182,7 @@ public class FSEditLog implements LogsPurgeable { // these are statistics counters. private long numTransactions; // number of transactions - private final AtomicLong numTransactionsBatchedInSync = new AtomicLong(); + private final LongAdder numTransactionsBatchedInSync = new LongAdder(); private long totalTimeTransactions; // total time for all transactions private NameNodeMetrics metrics; @@ -730,7 +730,7 @@ public class FSEditLog implements LogsPurgeable { if (metrics != null) { // Metrics non-null only when used inside name node metrics.addSync(elapsed); metrics.incrTransactionsBatchedInSync(editsBatchedInSync); - numTransactionsBatchedInSync.addAndGet(editsBatchedInSync); + numTransactionsBatchedInSync.add(editsBatchedInSync); } } finally { @@ -770,7 +770,7 @@ public class FSEditLog implements LogsPurgeable { .append(" Total time for transactions(ms): ") .append(totalTimeTransactions) .append(" Number of transactions batched in Syncs: ") - .append(numTransactionsBatchedInSync.get()) + .append(numTransactionsBatchedInSync.longValue()) .append(" Number of syncs: ") .append(editLogStream.getNumSync()) .append(" SyncTimes(ms): ") @@ -1402,7 +1402,7 @@ public class FSEditLog implements LogsPurgeable { numTransactions = 0; totalTimeTransactions = 0; - numTransactionsBatchedInSync.set(0L); + numTransactionsBatchedInSync.reset(); // TODO no need to link this back to storage anymore! // See HDFS-2174. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java index 6502c4c2b4d..f556219b324 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -112,12 +113,12 @@ class FSNamesystemLock { * The number of time the read lock * has been held longer than the threshold. */ - private final AtomicLong numReadLockLongHold = new AtomicLong(0); + private final LongAdder numReadLockLongHold = new LongAdder(); /** * The number of time the write lock * has been held for longer than the threshold. */ - private final AtomicLong numWriteLockLongHold = new AtomicLong(0); + private final LongAdder numWriteLockLongHold = new LongAdder(); @VisibleForTesting static final String OP_NAME_OTHER = "OTHER"; @@ -186,7 +187,7 @@ class FSNamesystemLock { final long readLockIntervalMs = TimeUnit.NANOSECONDS.toMillis(readLockIntervalNanos); if (needReport && readLockIntervalMs >= this.readLockReportingThresholdMs) { - numReadLockLongHold.incrementAndGet(); + numReadLockLongHold.increment(); LockHeldInfo localLockHeldInfo; do { localLockHeldInfo = longestReadLockHeldInfo.get(); @@ -264,7 +265,7 @@ class FSNamesystemLock { LogAction logAction = LogThrottlingHelper.DO_NOT_LOG; if (needReport && writeLockIntervalMs >= this.writeLockReportingThresholdMs) { - numWriteLockLongHold.incrementAndGet(); + numWriteLockLongHold.increment(); if (longestWriteLockHeldInfo.getIntervalMs() < writeLockIntervalMs) { longestWriteLockHeldInfo = new LockHeldInfo(currentTimeMs, writeLockIntervalMs, @@ -322,7 +323,7 @@ class FSNamesystemLock { * has been held longer than the threshold */ public long getNumOfReadLockLongHold() { - return numReadLockLongHold.get(); + return numReadLockLongHold.longValue(); } /** @@ -333,7 +334,7 @@ class FSNamesystemLock { * has been held longer than the threshold. */ public long getNumOfWriteLockLongHold() { - return numWriteLockLongHold.get(); + return numWriteLockLongHold.longValue(); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java index df6b7dc1254..8cc68147d77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java @@ -401,9 +401,9 @@ public class TestPread { DFSClient dfsClient = fileSys.getClient(); DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics(); // Metrics instance is static, so we need to reset counts from prior tests. - metrics.hedgedReadOps.set(0); - metrics.hedgedReadOpsWin.set(0); - metrics.hedgedReadOpsInCurThread.set(0); + metrics.hedgedReadOps.reset(); + metrics.hedgedReadOpsWin.reset(); + metrics.hedgedReadOpsInCurThread.reset(); try { Path file1 = new Path("hedgedReadMaxOut.dat"); @@ -590,7 +590,7 @@ public class TestPread { String filename = "/hedgedReadMaxOut.dat"; DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics(); // Metrics instance is static, so we need to reset counts from prior tests. - metrics.hedgedReadOps.set(0); + metrics.hedgedReadOps.reset(); try { Path file = new Path(filename); output = fileSys.create(file, (short) 2);