HDFS-11251. ConcurrentModificationException during DataNode#refreshVolumes. (Manoj Govindassamy via lei)
(cherry picked from commit e9f1396834
)
This commit is contained in:
parent
af266c8c85
commit
f349c7692d
|
@ -32,6 +32,7 @@ import java.util.Arrays;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -121,8 +122,9 @@ public abstract class Storage extends StorageInfo {
|
||||||
public StorageDirType getStorageDirType();
|
public StorageDirType getStorageDirType();
|
||||||
public boolean isOfType(StorageDirType type);
|
public boolean isOfType(StorageDirType type);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<StorageDirectory> storageDirs = new ArrayList<StorageDirectory>();
|
protected List<StorageDirectory> storageDirs =
|
||||||
|
new CopyOnWriteArrayList<StorageDirectory>();
|
||||||
|
|
||||||
private class DirIterator implements Iterator<StorageDirectory> {
|
private class DirIterator implements Iterator<StorageDirectory> {
|
||||||
final StorageDirType dirType;
|
final StorageDirType dirType;
|
||||||
|
|
|
@ -296,7 +296,7 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
it.hasNext(); ) {
|
it.hasNext(); ) {
|
||||||
StorageDirectory sd = it.next();
|
StorageDirectory sd = it.next();
|
||||||
if (sd.getRoot().getAbsoluteFile().equals(absPathToRemove)) {
|
if (sd.getRoot().getAbsoluteFile().equals(absPathToRemove)) {
|
||||||
it.remove();
|
this.storageDirs.remove(sd);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -519,7 +519,7 @@ public class DataStorage extends Storage {
|
||||||
bpsStorage.remove(bpRoot.getAbsoluteFile());
|
bpsStorage.remove(bpRoot.getAbsoluteFile());
|
||||||
}
|
}
|
||||||
|
|
||||||
it.remove();
|
this.storageDirs.remove(sd);
|
||||||
try {
|
try {
|
||||||
sd.unlock();
|
sd.unlock();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|
|
@ -47,7 +47,9 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -62,8 +64,10 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.BrokenBarrierException;
|
import java.util.concurrent.BrokenBarrierException;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -91,6 +95,7 @@ public class TestDataNodeHotSwapVolumes {
|
||||||
private static final Log LOG = LogFactory.getLog(
|
private static final Log LOG = LogFactory.getLog(
|
||||||
TestDataNodeHotSwapVolumes.class);
|
TestDataNodeHotSwapVolumes.class);
|
||||||
private static final int BLOCK_SIZE = 512;
|
private static final int BLOCK_SIZE = 512;
|
||||||
|
private static final int DEFAULT_STORAGES_PER_DATANODE = 2;
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -100,6 +105,11 @@ public class TestDataNodeHotSwapVolumes {
|
||||||
|
|
||||||
private void startDFSCluster(int numNameNodes, int numDataNodes)
|
private void startDFSCluster(int numNameNodes, int numDataNodes)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
startDFSCluster(numNameNodes, numDataNodes, DEFAULT_STORAGES_PER_DATANODE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startDFSCluster(int numNameNodes, int numDataNodes,
|
||||||
|
int storagePerDataNode) throws IOException {
|
||||||
shutdown();
|
shutdown();
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||||
|
@ -121,6 +131,7 @@ public class TestDataNodeHotSwapVolumes {
|
||||||
cluster = new MiniDFSCluster.Builder(conf)
|
cluster = new MiniDFSCluster.Builder(conf)
|
||||||
.nnTopology(nnTopology)
|
.nnTopology(nnTopology)
|
||||||
.numDataNodes(numDataNodes)
|
.numDataNodes(numDataNodes)
|
||||||
|
.storagesPerDatanode(storagePerDataNode)
|
||||||
.build();
|
.build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
}
|
}
|
||||||
|
@ -279,7 +290,12 @@ public class TestDataNodeHotSwapVolumes {
|
||||||
|
|
||||||
/** Add volumes to the first DataNode. */
|
/** Add volumes to the first DataNode. */
|
||||||
private void addVolumes(int numNewVolumes)
|
private void addVolumes(int numNewVolumes)
|
||||||
throws ReconfigurationException, IOException {
|
throws InterruptedException, IOException, ReconfigurationException {
|
||||||
|
addVolumes(numNewVolumes, new CountDownLatch(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addVolumes(int numNewVolumes, CountDownLatch waitLatch)
|
||||||
|
throws ReconfigurationException, IOException, InterruptedException {
|
||||||
File dataDir = new File(cluster.getDataDirectory());
|
File dataDir = new File(cluster.getDataDirectory());
|
||||||
DataNode dn = cluster.getDataNodes().get(0); // First DataNode.
|
DataNode dn = cluster.getDataNodes().get(0); // First DataNode.
|
||||||
Configuration conf = dn.getConf();
|
Configuration conf = dn.getConf();
|
||||||
|
@ -311,6 +327,9 @@ public class TestDataNodeHotSwapVolumes {
|
||||||
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir),
|
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir),
|
||||||
is(conf.get(DFS_DATANODE_DATA_DIR_KEY)));
|
is(conf.get(DFS_DATANODE_DATA_DIR_KEY)));
|
||||||
|
|
||||||
|
// Await on the latch for needed operations to complete
|
||||||
|
waitLatch.await();
|
||||||
|
|
||||||
// Verify the configuration value is appropriately set.
|
// Verify the configuration value is appropriately set.
|
||||||
String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(",");
|
String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(",");
|
||||||
String[] expectDataDirs = newDataDir.split(",");
|
String[] expectDataDirs = newDataDir.split(",");
|
||||||
|
@ -398,23 +417,34 @@ public class TestDataNodeHotSwapVolumes {
|
||||||
throws IOException, InterruptedException, TimeoutException,
|
throws IOException, InterruptedException, TimeoutException,
|
||||||
ReconfigurationException {
|
ReconfigurationException {
|
||||||
startDFSCluster(1, 1);
|
startDFSCluster(1, 1);
|
||||||
|
int numVolumes = cluster.getStoragesPerDatanode();
|
||||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
Path testFile = new Path("/test");
|
Path testFile = new Path("/test");
|
||||||
createFile(testFile, 4); // Each volume has 2 blocks.
|
|
||||||
|
|
||||||
addVolumes(2);
|
// Each volume has 2 blocks
|
||||||
|
int initialBlockCount = numVolumes * 2;
|
||||||
|
createFile(testFile, initialBlockCount);
|
||||||
|
|
||||||
|
int newVolumeCount = 5;
|
||||||
|
addVolumes(newVolumeCount);
|
||||||
|
numVolumes += newVolumeCount;
|
||||||
|
|
||||||
|
int additionalBlockCount = 9;
|
||||||
|
int totalBlockCount = initialBlockCount + additionalBlockCount;
|
||||||
|
|
||||||
// Continue to write the same file, thus the new volumes will have blocks.
|
// Continue to write the same file, thus the new volumes will have blocks.
|
||||||
DFSTestUtil.appendFile(cluster.getFileSystem(), testFile, BLOCK_SIZE * 8);
|
DFSTestUtil.appendFile(cluster.getFileSystem(), testFile,
|
||||||
verifyFileLength(cluster.getFileSystem(), testFile, 8 + 4);
|
BLOCK_SIZE * additionalBlockCount);
|
||||||
// After appending data, there should be [2, 2, 4, 4] blocks in each volume
|
verifyFileLength(cluster.getFileSystem(), testFile, totalBlockCount);
|
||||||
// respectively.
|
|
||||||
List<Integer> expectedNumBlocks = Arrays.asList(2, 2, 4, 4);
|
// After appending data, each new volume added should
|
||||||
|
// have 1 block each.
|
||||||
|
List<Integer> expectedNumBlocks = Arrays.asList(1, 1, 1, 1, 1, 4, 4);
|
||||||
|
|
||||||
List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
|
List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
|
||||||
cluster.getAllBlockReports(bpid);
|
cluster.getAllBlockReports(bpid);
|
||||||
assertEquals(1, blockReports.size()); // 1 DataNode
|
assertEquals(1, blockReports.size()); // 1 DataNode
|
||||||
assertEquals(4, blockReports.get(0).size()); // 4 volumes
|
assertEquals(numVolumes, blockReports.get(0).size()); // 7 volumes
|
||||||
Map<DatanodeStorage, BlockListAsLongs> dnReport =
|
Map<DatanodeStorage, BlockListAsLongs> dnReport =
|
||||||
blockReports.get(0);
|
blockReports.get(0);
|
||||||
List<Integer> actualNumBlocks = new ArrayList<Integer>();
|
List<Integer> actualNumBlocks = new ArrayList<Integer>();
|
||||||
|
@ -425,6 +455,110 @@ public class TestDataNodeHotSwapVolumes {
|
||||||
assertEquals(expectedNumBlocks, actualNumBlocks);
|
assertEquals(expectedNumBlocks, actualNumBlocks);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=180000)
|
||||||
|
public void testAddVolumesConcurrently()
|
||||||
|
throws IOException, InterruptedException, TimeoutException,
|
||||||
|
ReconfigurationException {
|
||||||
|
startDFSCluster(1, 1, 10);
|
||||||
|
int numVolumes = cluster.getStoragesPerDatanode();
|
||||||
|
String blockPoolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
|
Path testFile = new Path("/test");
|
||||||
|
|
||||||
|
// Each volume has 2 blocks
|
||||||
|
int initialBlockCount = numVolumes * 2;
|
||||||
|
createFile(testFile, initialBlockCount);
|
||||||
|
|
||||||
|
final DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data;
|
||||||
|
dn.data = Mockito.spy(data);
|
||||||
|
|
||||||
|
final int newVolumeCount = 40;
|
||||||
|
final List<Thread> addVolumeDelayedThreads = new ArrayList<>();
|
||||||
|
final AtomicBoolean addVolumeError = new AtomicBoolean(false);
|
||||||
|
final AtomicBoolean listStorageError = new AtomicBoolean(false);
|
||||||
|
final CountDownLatch addVolumeCompletionLatch =
|
||||||
|
new CountDownLatch(newVolumeCount);
|
||||||
|
|
||||||
|
// Thread to list all storage available at DataNode,
|
||||||
|
// when the volumes are being added in parallel.
|
||||||
|
final Thread listStorageThread = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (addVolumeCompletionLatch.getCount() != newVolumeCount) {
|
||||||
|
int i = 0;
|
||||||
|
while(i++ < 1000) {
|
||||||
|
try {
|
||||||
|
dn.getStorage().listStorageDirectories();
|
||||||
|
} catch (Exception e) {
|
||||||
|
listStorageError.set(true);
|
||||||
|
LOG.error("Error listing storage: " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
listStorageThread.start();
|
||||||
|
|
||||||
|
// FsDatasetImpl addVolume mocked to perform the operation asynchronously
|
||||||
|
doAnswer(new Answer<Object>() {
|
||||||
|
@Override
|
||||||
|
public Object answer(final InvocationOnMock invocationOnMock) throws Throwable {
|
||||||
|
final Random r = new Random();
|
||||||
|
Thread addVolThread =
|
||||||
|
new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
r.setSeed(Time.now());
|
||||||
|
// Let 50% of add volume operations
|
||||||
|
// start after an initial delay.
|
||||||
|
if (r.nextInt(10) > 4) {
|
||||||
|
int s = r.nextInt(10) + 1;
|
||||||
|
Thread.sleep(s * 100);
|
||||||
|
}
|
||||||
|
invocationOnMock.callRealMethod();
|
||||||
|
} catch (Throwable throwable) {
|
||||||
|
addVolumeError.set(true);
|
||||||
|
LOG.error("Error adding volume: " + throwable);
|
||||||
|
} finally {
|
||||||
|
addVolumeCompletionLatch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
addVolumeDelayedThreads.add(addVolThread);
|
||||||
|
addVolThread.start();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).when(dn.data).addVolume(any(StorageLocation.class), any(List.class));
|
||||||
|
|
||||||
|
addVolumes(newVolumeCount, addVolumeCompletionLatch);
|
||||||
|
numVolumes += newVolumeCount;
|
||||||
|
|
||||||
|
// Wait for all addVolume and listStorage Threads to complete
|
||||||
|
for (Thread t : addVolumeDelayedThreads) {
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
listStorageThread.join();
|
||||||
|
|
||||||
|
// Verify errors while adding volumes and listing storage directories
|
||||||
|
Assert.assertEquals("Error adding volumes!", false, addVolumeError.get());
|
||||||
|
Assert.assertEquals("Error listing storage!",
|
||||||
|
false, listStorageError.get());
|
||||||
|
|
||||||
|
int additionalBlockCount = 9;
|
||||||
|
int totalBlockCount = initialBlockCount + additionalBlockCount;
|
||||||
|
|
||||||
|
// Continue to write the same file, thus the new volumes will have blocks.
|
||||||
|
DFSTestUtil.appendFile(cluster.getFileSystem(), testFile,
|
||||||
|
BLOCK_SIZE * additionalBlockCount);
|
||||||
|
verifyFileLength(cluster.getFileSystem(), testFile, totalBlockCount);
|
||||||
|
|
||||||
|
List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
|
||||||
|
cluster.getAllBlockReports(blockPoolId);
|
||||||
|
assertEquals(1, blockReports.size());
|
||||||
|
assertEquals(numVolumes, blockReports.get(0).size());
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
public void testAddVolumesToFederationNN()
|
public void testAddVolumesToFederationNN()
|
||||||
throws IOException, TimeoutException, InterruptedException,
|
throws IOException, TimeoutException, InterruptedException,
|
||||||
|
@ -778,7 +912,7 @@ public class TestDataNodeHotSwapVolumes {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify that {@link DataNode#checkDiskErrors()} removes all metadata in
|
* Verify that {@link DataNode#checkDiskError()} removes all metadata in
|
||||||
* DataNode upon a volume failure. Thus we can run reconfig on the same
|
* DataNode upon a volume failure. Thus we can run reconfig on the same
|
||||||
* configuration to reload the new volume on the same directory as the failed one.
|
* configuration to reload the new volume on the same directory as the failed one.
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue