HDFS-11251. ConcurrentModificationException during DataNode#refreshVolumes. (Manoj Govindassamy via lei)

This commit is contained in:
Lei Xu 2016-12-29 15:10:36 +08:00
parent 603f3ef138
commit e9f1396834
4 changed files with 150 additions and 14 deletions

View File

@ -32,6 +32,7 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
@ -122,8 +123,9 @@ public abstract class Storage extends StorageInfo {
public StorageDirType getStorageDirType();
public boolean isOfType(StorageDirType type);
}
protected List<StorageDirectory> storageDirs = new ArrayList<StorageDirectory>();
protected List<StorageDirectory> storageDirs =
new CopyOnWriteArrayList<StorageDirectory>();
private class DirIterator implements Iterator<StorageDirectory> {
final StorageDirType dirType;

View File

@ -299,7 +299,7 @@ public class BlockPoolSliceStorage extends Storage {
it.hasNext(); ) {
StorageDirectory sd = it.next();
if (sd.getRoot().getAbsoluteFile().equals(absPathToRemove)) {
it.remove();
this.storageDirs.remove(sd);
break;
}
}

View File

@ -508,7 +508,7 @@ public class DataStorage extends Storage {
bpsStorage.remove(bpRoot.getAbsoluteFile());
}
it.remove();
this.storageDirs.remove(sd);
try {
sd.unlock();
} catch (IOException e) {

View File

@ -47,7 +47,9 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
@ -61,9 +63,11 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -91,6 +95,7 @@ public class TestDataNodeHotSwapVolumes {
private static final Log LOG = LogFactory.getLog(
TestDataNodeHotSwapVolumes.class);
private static final int BLOCK_SIZE = 512;
private static final int DEFAULT_STORAGES_PER_DATANODE = 2;
private MiniDFSCluster cluster;
@After
@ -100,6 +105,11 @@ public class TestDataNodeHotSwapVolumes {
private void startDFSCluster(int numNameNodes, int numDataNodes)
throws IOException {
startDFSCluster(numNameNodes, numDataNodes, DEFAULT_STORAGES_PER_DATANODE);
}
private void startDFSCluster(int numNameNodes, int numDataNodes,
int storagePerDataNode) throws IOException {
shutdown();
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
@ -123,6 +133,7 @@ public class TestDataNodeHotSwapVolumes {
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(nnTopology)
.numDataNodes(numDataNodes)
.storagesPerDatanode(storagePerDataNode)
.build();
cluster.waitActive();
}
@ -281,7 +292,12 @@ public class TestDataNodeHotSwapVolumes {
/** Add volumes to the first DataNode. */
private void addVolumes(int numNewVolumes)
throws ReconfigurationException, IOException {
throws InterruptedException, IOException, ReconfigurationException {
addVolumes(numNewVolumes, new CountDownLatch(0));
}
private void addVolumes(int numNewVolumes, CountDownLatch waitLatch)
throws ReconfigurationException, IOException, InterruptedException {
File dataDir = new File(cluster.getDataDirectory());
DataNode dn = cluster.getDataNodes().get(0); // First DataNode.
Configuration conf = dn.getConf();
@ -313,6 +329,9 @@ public class TestDataNodeHotSwapVolumes {
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir),
is(conf.get(DFS_DATANODE_DATA_DIR_KEY)));
// Await on the latch for needed operations to complete
waitLatch.await();
// Verify the configuration value is appropriately set.
String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(",");
String[] expectDataDirs = newDataDir.split(",");
@ -400,23 +419,34 @@ public class TestDataNodeHotSwapVolumes {
throws IOException, InterruptedException, TimeoutException,
ReconfigurationException {
startDFSCluster(1, 1);
int numVolumes = cluster.getStoragesPerDatanode();
String bpid = cluster.getNamesystem().getBlockPoolId();
Path testFile = new Path("/test");
createFile(testFile, 4); // Each volume has 2 blocks.
addVolumes(2);
// Each volume has 2 blocks
int initialBlockCount = numVolumes * 2;
createFile(testFile, initialBlockCount);
int newVolumeCount = 5;
addVolumes(newVolumeCount);
numVolumes += newVolumeCount;
int additionalBlockCount = 9;
int totalBlockCount = initialBlockCount + additionalBlockCount;
// Continue to write the same file, thus the new volumes will have blocks.
DFSTestUtil.appendFile(cluster.getFileSystem(), testFile, BLOCK_SIZE * 8);
verifyFileLength(cluster.getFileSystem(), testFile, 8 + 4);
// After appending data, there should be [2, 2, 4, 4] blocks in each volume
// respectively.
List<Integer> expectedNumBlocks = Arrays.asList(2, 2, 4, 4);
DFSTestUtil.appendFile(cluster.getFileSystem(), testFile,
BLOCK_SIZE * additionalBlockCount);
verifyFileLength(cluster.getFileSystem(), testFile, totalBlockCount);
// After appending data, each new volume added should
// have 1 block each.
List<Integer> expectedNumBlocks = Arrays.asList(1, 1, 1, 1, 1, 4, 4);
List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
cluster.getAllBlockReports(bpid);
assertEquals(1, blockReports.size()); // 1 DataNode
assertEquals(4, blockReports.get(0).size()); // 4 volumes
assertEquals(numVolumes, blockReports.get(0).size()); // 7 volumes
Map<DatanodeStorage, BlockListAsLongs> dnReport =
blockReports.get(0);
List<Integer> actualNumBlocks = new ArrayList<Integer>();
@ -427,6 +457,110 @@ public class TestDataNodeHotSwapVolumes {
assertEquals(expectedNumBlocks, actualNumBlocks);
}
@Test(timeout=180000)
public void testAddVolumesConcurrently()
throws IOException, InterruptedException, TimeoutException,
ReconfigurationException {
startDFSCluster(1, 1, 10);
int numVolumes = cluster.getStoragesPerDatanode();
String blockPoolId = cluster.getNamesystem().getBlockPoolId();
Path testFile = new Path("/test");
// Each volume has 2 blocks
int initialBlockCount = numVolumes * 2;
createFile(testFile, initialBlockCount);
DataNode dn = cluster.getDataNodes().get(0);
final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data;
dn.data = Mockito.spy(data);
final int newVolumeCount = 40;
List<Thread> addVolumeDelayedThreads = new ArrayList<>();
AtomicBoolean addVolumeError = new AtomicBoolean(false);
AtomicBoolean listStorageError = new AtomicBoolean(false);
CountDownLatch addVolumeCompletionLatch =
new CountDownLatch(newVolumeCount);
// Thread to list all storage available at DataNode,
// when the volumes are being added in parallel.
final Thread listStorageThread = new Thread(new Runnable() {
@Override
public void run() {
while (addVolumeCompletionLatch.getCount() != newVolumeCount) {
int i = 0;
while(i++ < 1000) {
try {
dn.getStorage().listStorageDirectories();
} catch (Exception e) {
listStorageError.set(true);
LOG.error("Error listing storage: " + e);
}
}
}
}
});
listStorageThread.start();
// FsDatasetImpl addVolume mocked to perform the operation asynchronously
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
final Random r = new Random();
Thread addVolThread =
new Thread(new Runnable() {
@Override
public void run() {
try {
r.setSeed(Time.now());
// Let 50% of add volume operations
// start after an initial delay.
if (r.nextInt(10) > 4) {
int s = r.nextInt(10) + 1;
Thread.sleep(s * 100);
}
invocationOnMock.callRealMethod();
} catch (Throwable throwable) {
addVolumeError.set(true);
LOG.error("Error adding volume: " + throwable);
} finally {
addVolumeCompletionLatch.countDown();
}
}
});
addVolumeDelayedThreads.add(addVolThread);
addVolThread.start();
return null;
}
}).when(dn.data).addVolume(any(StorageLocation.class), any(List.class));
addVolumes(newVolumeCount, addVolumeCompletionLatch);
numVolumes += newVolumeCount;
// Wait for all addVolume and listStorage Threads to complete
for (Thread t : addVolumeDelayedThreads) {
t.join();
}
listStorageThread.join();
// Verify errors while adding volumes and listing storage directories
Assert.assertEquals("Error adding volumes!", false, addVolumeError.get());
Assert.assertEquals("Error listing storage!",
false, listStorageError.get());
int additionalBlockCount = 9;
int totalBlockCount = initialBlockCount + additionalBlockCount;
// Continue to write the same file, thus the new volumes will have blocks.
DFSTestUtil.appendFile(cluster.getFileSystem(), testFile,
BLOCK_SIZE * additionalBlockCount);
verifyFileLength(cluster.getFileSystem(), testFile, totalBlockCount);
List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
cluster.getAllBlockReports(blockPoolId);
assertEquals(1, blockReports.size());
assertEquals(numVolumes, blockReports.get(0).size());
}
@Test(timeout=60000)
public void testAddVolumesToFederationNN()
throws IOException, TimeoutException, InterruptedException,
@ -780,7 +914,7 @@ public class TestDataNodeHotSwapVolumes {
}
/**
* Verify that {@link DataNode#checkDiskErrors()} removes all metadata in
* Verify that {@link DataNode#checkDiskError()} removes all metadata in
* DataNode upon a volume failure. Thus we can run reconfig on the same
* configuration to reload the new volume on the same directory as the failed one.
*/