From f349c7692d38642afd31ce7ffc6e1960f0781743 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Thu, 29 Dec 2016 15:10:36 +0800 Subject: [PATCH] HDFS-11251. ConcurrentModificationException during DataNode#refreshVolumes. (Manoj Govindassamy via lei) (cherry picked from commit e9f1396834174646a8d7aa8fc6c4a4f724ca5b28) --- .../hadoop/hdfs/server/common/Storage.java | 6 +- .../datanode/BlockPoolSliceStorage.java | 2 +- .../hdfs/server/datanode/DataStorage.java | 2 +- .../datanode/TestDataNodeHotSwapVolumes.java | 154 ++++++++++++++++-- 4 files changed, 150 insertions(+), 14 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java index 6f1ee19d6ad..009d0f03e57 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java @@ -32,6 +32,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Properties; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; @@ -121,8 +122,9 @@ public abstract class Storage extends StorageInfo { public StorageDirType getStorageDirType(); public boolean isOfType(StorageDirType type); } - - protected List storageDirs = new ArrayList(); + + protected List storageDirs = + new CopyOnWriteArrayList(); private class DirIterator implements Iterator { final StorageDirType dirType; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java index fd90ae921aa..81fce6f621b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java @@ -296,7 +296,7 @@ public class BlockPoolSliceStorage extends Storage { it.hasNext(); ) { StorageDirectory sd = it.next(); if (sd.getRoot().getAbsoluteFile().equals(absPathToRemove)) { - it.remove(); + this.storageDirs.remove(sd); break; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index 0e6b339db00..0a8f1fef3e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -519,7 +519,7 @@ public class DataStorage extends Storage { bpsStorage.remove(bpRoot.getAbsoluteFile()); } - it.remove(); + this.storageDirs.remove(sd); try { sd.unlock(); } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index e36b744cecf..5d4543d4b26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -47,7 +47,9 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.junit.After; +import org.junit.Assert; import org.junit.Test; import java.io.File; @@ -62,8 +64,10 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -91,6 +95,7 @@ public class TestDataNodeHotSwapVolumes { private static final Log LOG = LogFactory.getLog( TestDataNodeHotSwapVolumes.class); private static final int BLOCK_SIZE = 512; + private static final int DEFAULT_STORAGES_PER_DATANODE = 2; private MiniDFSCluster cluster; @After @@ -100,6 +105,11 @@ public class TestDataNodeHotSwapVolumes { private void startDFSCluster(int numNameNodes, int numDataNodes) throws IOException { + startDFSCluster(numNameNodes, numDataNodes, DEFAULT_STORAGES_PER_DATANODE); + } + + private void startDFSCluster(int numNameNodes, int numDataNodes, + int storagePerDataNode) throws IOException { shutdown(); Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); @@ -121,6 +131,7 @@ public class TestDataNodeHotSwapVolumes { cluster = new MiniDFSCluster.Builder(conf) .nnTopology(nnTopology) .numDataNodes(numDataNodes) + .storagesPerDatanode(storagePerDataNode) .build(); cluster.waitActive(); } @@ -279,7 +290,12 @@ public class TestDataNodeHotSwapVolumes { /** Add volumes to the first DataNode. */ private void addVolumes(int numNewVolumes) - throws ReconfigurationException, IOException { + throws InterruptedException, IOException, ReconfigurationException { + addVolumes(numNewVolumes, new CountDownLatch(0)); + } + + private void addVolumes(int numNewVolumes, CountDownLatch waitLatch) + throws ReconfigurationException, IOException, InterruptedException { File dataDir = new File(cluster.getDataDirectory()); DataNode dn = cluster.getDataNodes().get(0); // First DataNode. Configuration conf = dn.getConf(); @@ -311,6 +327,9 @@ public class TestDataNodeHotSwapVolumes { dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir), is(conf.get(DFS_DATANODE_DATA_DIR_KEY))); + // Await on the latch for needed operations to complete + waitLatch.await(); + // Verify the configuration value is appropriately set. String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(","); String[] expectDataDirs = newDataDir.split(","); @@ -398,23 +417,34 @@ public class TestDataNodeHotSwapVolumes { throws IOException, InterruptedException, TimeoutException, ReconfigurationException { startDFSCluster(1, 1); + int numVolumes = cluster.getStoragesPerDatanode(); String bpid = cluster.getNamesystem().getBlockPoolId(); Path testFile = new Path("/test"); - createFile(testFile, 4); // Each volume has 2 blocks. - addVolumes(2); + // Each volume has 2 blocks + int initialBlockCount = numVolumes * 2; + createFile(testFile, initialBlockCount); + + int newVolumeCount = 5; + addVolumes(newVolumeCount); + numVolumes += newVolumeCount; + + int additionalBlockCount = 9; + int totalBlockCount = initialBlockCount + additionalBlockCount; // Continue to write the same file, thus the new volumes will have blocks. - DFSTestUtil.appendFile(cluster.getFileSystem(), testFile, BLOCK_SIZE * 8); - verifyFileLength(cluster.getFileSystem(), testFile, 8 + 4); - // After appending data, there should be [2, 2, 4, 4] blocks in each volume - // respectively. - List expectedNumBlocks = Arrays.asList(2, 2, 4, 4); + DFSTestUtil.appendFile(cluster.getFileSystem(), testFile, + BLOCK_SIZE * additionalBlockCount); + verifyFileLength(cluster.getFileSystem(), testFile, totalBlockCount); + + // After appending data, each new volume added should + // have 1 block each. + List expectedNumBlocks = Arrays.asList(1, 1, 1, 1, 1, 4, 4); List> blockReports = cluster.getAllBlockReports(bpid); assertEquals(1, blockReports.size()); // 1 DataNode - assertEquals(4, blockReports.get(0).size()); // 4 volumes + assertEquals(numVolumes, blockReports.get(0).size()); // 7 volumes Map dnReport = blockReports.get(0); List actualNumBlocks = new ArrayList(); @@ -425,6 +455,110 @@ public class TestDataNodeHotSwapVolumes { assertEquals(expectedNumBlocks, actualNumBlocks); } + @Test(timeout=180000) + public void testAddVolumesConcurrently() + throws IOException, InterruptedException, TimeoutException, + ReconfigurationException { + startDFSCluster(1, 1, 10); + int numVolumes = cluster.getStoragesPerDatanode(); + String blockPoolId = cluster.getNamesystem().getBlockPoolId(); + Path testFile = new Path("/test"); + + // Each volume has 2 blocks + int initialBlockCount = numVolumes * 2; + createFile(testFile, initialBlockCount); + + final DataNode dn = cluster.getDataNodes().get(0); + final FsDatasetSpi data = dn.data; + dn.data = Mockito.spy(data); + + final int newVolumeCount = 40; + final List addVolumeDelayedThreads = new ArrayList<>(); + final AtomicBoolean addVolumeError = new AtomicBoolean(false); + final AtomicBoolean listStorageError = new AtomicBoolean(false); + final CountDownLatch addVolumeCompletionLatch = + new CountDownLatch(newVolumeCount); + + // Thread to list all storage available at DataNode, + // when the volumes are being added in parallel. + final Thread listStorageThread = new Thread(new Runnable() { + @Override + public void run() { + while (addVolumeCompletionLatch.getCount() != newVolumeCount) { + int i = 0; + while(i++ < 1000) { + try { + dn.getStorage().listStorageDirectories(); + } catch (Exception e) { + listStorageError.set(true); + LOG.error("Error listing storage: " + e); + } + } + } + } + }); + listStorageThread.start(); + + // FsDatasetImpl addVolume mocked to perform the operation asynchronously + doAnswer(new Answer() { + @Override + public Object answer(final InvocationOnMock invocationOnMock) throws Throwable { + final Random r = new Random(); + Thread addVolThread = + new Thread(new Runnable() { + @Override + public void run() { + try { + r.setSeed(Time.now()); + // Let 50% of add volume operations + // start after an initial delay. + if (r.nextInt(10) > 4) { + int s = r.nextInt(10) + 1; + Thread.sleep(s * 100); + } + invocationOnMock.callRealMethod(); + } catch (Throwable throwable) { + addVolumeError.set(true); + LOG.error("Error adding volume: " + throwable); + } finally { + addVolumeCompletionLatch.countDown(); + } + } + }); + addVolumeDelayedThreads.add(addVolThread); + addVolThread.start(); + return null; + } + }).when(dn.data).addVolume(any(StorageLocation.class), any(List.class)); + + addVolumes(newVolumeCount, addVolumeCompletionLatch); + numVolumes += newVolumeCount; + + // Wait for all addVolume and listStorage Threads to complete + for (Thread t : addVolumeDelayedThreads) { + t.join(); + } + listStorageThread.join(); + + // Verify errors while adding volumes and listing storage directories + Assert.assertEquals("Error adding volumes!", false, addVolumeError.get()); + Assert.assertEquals("Error listing storage!", + false, listStorageError.get()); + + int additionalBlockCount = 9; + int totalBlockCount = initialBlockCount + additionalBlockCount; + + // Continue to write the same file, thus the new volumes will have blocks. + DFSTestUtil.appendFile(cluster.getFileSystem(), testFile, + BLOCK_SIZE * additionalBlockCount); + verifyFileLength(cluster.getFileSystem(), testFile, totalBlockCount); + + List> blockReports = + cluster.getAllBlockReports(blockPoolId); + assertEquals(1, blockReports.size()); + assertEquals(numVolumes, blockReports.get(0).size()); + } + @Test(timeout=60000) public void testAddVolumesToFederationNN() throws IOException, TimeoutException, InterruptedException, @@ -778,7 +912,7 @@ public class TestDataNodeHotSwapVolumes { } /** - * Verify that {@link DataNode#checkDiskErrors()} removes all metadata in + * Verify that {@link DataNode#checkDiskError()} removes all metadata in * DataNode upon a volume failure. Thus we can run reconfig on the same * configuration to reload the new volume on the same directory as the failed one. */