From 26b51f3e2295b9a85ee1fc9a7f475cb3dc181933 Mon Sep 17 00:00:00 2001 From: Yiqun Lin Date: Thu, 28 Nov 2019 10:43:35 +0800 Subject: [PATCH] HDFS-14986. ReplicaCachingGetSpaceUsed throws ConcurrentModificationException. Contributed by Aiphago. (cherry picked from commit 2b452b4e6063072b2bec491edd3f412eb7ac21f3) --- .../apache/hadoop/fs/CachingGetSpaceUsed.java | 34 +++++++++++- .../datanode/fsdataset/FsDatasetSpi.java | 6 ++ .../fsdataset/impl/FsDatasetImpl.java | 12 ++-- .../impl/ReplicaCachingGetSpaceUsed.java | 1 + .../impl/TestReplicaCachingGetSpaceUsed.java | 55 +++++++++++++++++++ 5 files changed, 98 insertions(+), 10 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java index 92476d77ddb..58dc82d2efb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java @@ -47,6 +47,7 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed { private final long jitter; private final String dirPath; private Thread refreshUsed; + private boolean shouldFirstRefresh; /** * This is the constructor used by the builder. @@ -79,16 +80,30 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed { this.refreshInterval = interval; this.jitter = jitter; this.used.set(initialUsed); + this.shouldFirstRefresh = true; } void init() { if (used.get() < 0) { used.set(0); + if (!shouldFirstRefresh) { + // Skip initial refresh operation, so we need to do first refresh + // operation immediately in refresh thread. + initRefeshThread(true); + return; + } refresh(); } + initRefeshThread(false); + } + /** + * RunImmediately should set true, if we skip the first refresh. + * @param runImmediately The param default should be false. + */ + private void initRefeshThread (boolean runImmediately) { if (refreshInterval > 0) { - refreshUsed = new Thread(new RefreshThread(this), + refreshUsed = new Thread(new RefreshThread(this, runImmediately), "refreshUsed-" + dirPath); refreshUsed.setDaemon(true); refreshUsed.start(); @@ -100,6 +115,14 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed { protected abstract void refresh(); + /** + * Reset that if we need to do the first refresh. + * @param shouldFirstRefresh The flag value to set. + */ + protected void setShouldFirstRefresh(boolean shouldFirstRefresh) { + this.shouldFirstRefresh = shouldFirstRefresh; + } + /** * @return an estimate of space used in the directory path. */ @@ -156,9 +179,11 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed { private static final class RefreshThread implements Runnable { final CachingGetSpaceUsed spaceUsed; + private boolean runImmediately; - RefreshThread(CachingGetSpaceUsed spaceUsed) { + RefreshThread(CachingGetSpaceUsed spaceUsed, boolean runImmediately) { this.spaceUsed = spaceUsed; + this.runImmediately = runImmediately; } @Override @@ -176,7 +201,10 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed { } // Make sure that after the jitter we didn't end up at 0. refreshInterval = Math.max(refreshInterval, 1); - Thread.sleep(refreshInterval); + if (!runImmediately) { + Thread.sleep(refreshInterval); + } + runImmediately = false; // update the used variable spaceUsed.refresh(); } catch (InterruptedException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 78a5cfc9676..578c3904635 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -661,5 +661,11 @@ public interface FsDatasetSpi extends FSDatasetMBean { */ AutoCloseableLock acquireDatasetLock(); + /** + * Deep copy the replica info belonging to given block pool. + * @param bpid Specified block pool id. + * @return A set of replica info. + * @throws IOException + */ Set deepCopyReplica(String bpid) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index ac92ae4fe8f..957b3066d8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -198,16 +198,14 @@ class FsDatasetImpl implements FsDatasetSpi { } } - /** - * The deepCopyReplica call doesn't use the datasetock since it will lead the - * potential deadlock with the {@link FsVolumeList#addBlockPool} call. - */ @Override public Set deepCopyReplica(String bpid) throws IOException { - Set replicas = - new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET - : volumeMap.replicas(bpid)); + Set replicas = null; + try (AutoCloseableLock lock = datasetLock.acquire()) { + replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections. + EMPTY_SET : volumeMap.replicas(bpid)); + } return Collections.unmodifiableSet(replicas); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaCachingGetSpaceUsed.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaCachingGetSpaceUsed.java index 2c1c16e5dd2..5acc3c04279 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaCachingGetSpaceUsed.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaCachingGetSpaceUsed.java @@ -59,6 +59,7 @@ public class ReplicaCachingGetSpaceUsed extends FSCachingGetSpaceUsed { public ReplicaCachingGetSpaceUsed(Builder builder) throws IOException { super(builder); + setShouldFirstRefresh(false); volume = builder.getVolume(); bpid = builder.getBpid(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaCachingGetSpaceUsed.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaCachingGetSpaceUsed.java index 45a3916f60c..6abf5238682 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaCachingGetSpaceUsed.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaCachingGetSpaceUsed.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import org.apache.commons.lang.math.RandomUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CachingGetSpaceUsed; import org.apache.hadoop.fs.FSDataOutputStream; @@ -27,8 +28,11 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.Replica; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.io.IOUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -36,6 +40,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.List; +import java.util.Set; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY; import static org.junit.Assert.assertEquals; @@ -145,4 +150,54 @@ public class TestReplicaCachingGetSpaceUsed { fs.delete(new Path("/testReplicaCachingGetSpaceUsedByRBWReplica"), true); } + + @Test(timeout = 15000) + public void testFsDatasetImplDeepCopyReplica() { + FsDatasetSpi fsDataset = dataNode.getFSDataset(); + ModifyThread modifyThread = new ModifyThread(); + modifyThread.start(); + String bpid = cluster.getNamesystem(0).getBlockPoolId(); + int retryTimes = 10; + + while (retryTimes > 0) { + try { + Set replicas = fsDataset.deepCopyReplica(bpid); + if (replicas.size() > 0) { + retryTimes--; + } + } catch (IOException e) { + modifyThread.setShouldRun(false); + Assert.fail("Encounter IOException when deep copy replica."); + } + } + modifyThread.setShouldRun(false); + } + + private class ModifyThread extends Thread { + private boolean shouldRun = true; + + @Override + public void run() { + FSDataOutputStream os = null; + while (shouldRun) { + try { + int id = RandomUtils.nextInt(); + os = fs.create(new Path("/testFsDatasetImplDeepCopyReplica/" + id)); + byte[] bytes = new byte[2048]; + InputStream is = new ByteArrayInputStream(bytes); + IOUtils.copyBytes(is, os, bytes.length); + os.hsync(); + os.close(); + } catch (IOException e) {} + } + + try { + fs.delete(new Path("/testFsDatasetImplDeepCopyReplica"), true); + } catch (IOException e) {} + } + + private void setShouldRun(boolean shouldRun) { + this.shouldRun = shouldRun; + } + } } \ No newline at end of file