HDFS-10282. The VolumeScanner should warn about replica files which are misplaced. Contributed by Colin Patrick McCabe.
This commit is contained in:
parent
df18b6e984
commit
0d1c1152f1
|
@ -914,13 +914,13 @@ public class DirectoryScanner implements Runnable {
|
|||
*/
|
||||
private void verifyFileLocation(File actualBlockFile,
|
||||
File bpFinalizedDir, long blockId) {
|
||||
File blockDir = DatanodeUtil.idToBlockDir(bpFinalizedDir, blockId);
|
||||
if (actualBlockFile.getParentFile().compareTo(blockDir) != 0) {
|
||||
File expBlockFile = new File(blockDir, actualBlockFile.getName());
|
||||
LOG.warn("Block: " + blockId
|
||||
+ " has to be upgraded to block ID-based layout. "
|
||||
+ "Actual block file path: " + actualBlockFile
|
||||
+ ", expected block file path: " + expBlockFile);
|
||||
File expectedBlockDir =
|
||||
DatanodeUtil.idToBlockDir(bpFinalizedDir, blockId);
|
||||
File actualBlockDir = actualBlockFile.getParentFile();
|
||||
if (actualBlockDir.compareTo(expectedBlockDir) != 0) {
|
||||
LOG.warn("Block: " + blockId +
|
||||
" found in invalid directory. Expected directory: " +
|
||||
expectedBlockDir + ". Actual directory: " + actualBlockDir);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -414,7 +414,7 @@ public class VolumeScanner extends Thread {
|
|||
Block b = volume.getDataset().getStoredBlock(
|
||||
cblock.getBlockPoolId(), cblock.getBlockId());
|
||||
if (b == null) {
|
||||
LOG.info("FileNotFound while finding block {} on volume {}",
|
||||
LOG.info("Replica {} was not found in the VolumeMap for volume {}",
|
||||
cblock, volume.getBasePath());
|
||||
} else {
|
||||
block = new ExtendedBlock(cblock.getBlockPoolId(), b);
|
||||
|
|
|
@ -697,6 +697,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
} else {
|
||||
ExtendedBlock block =
|
||||
new ExtendedBlock(bpid, Block.filename2id(state.curEntry));
|
||||
File expectedBlockDir = DatanodeUtil.idToBlockDir(
|
||||
new File("."), block.getBlockId());
|
||||
File actualBlockDir = Paths.get(".",
|
||||
state.curFinalizedDir, state.curFinalizedSubDir).toFile();
|
||||
if (!expectedBlockDir.equals(actualBlockDir)) {
|
||||
LOG.error("nextBlock({}, {}): block id {} found in invalid " +
|
||||
"directory. Expected directory: {}. " +
|
||||
"Actual directory: {}", storageID, bpid,
|
||||
block.getBlockId(), expectedBlockDir.getPath(),
|
||||
actualBlockDir.getPath());
|
||||
continue;
|
||||
}
|
||||
LOG.trace("nextBlock({}, {}): advancing to {}",
|
||||
storageID, bpid, block);
|
||||
return block;
|
||||
|
|
|
@ -135,6 +135,13 @@ public interface FsDatasetTestUtils {
|
|||
* @throws IOException I/O error.
|
||||
*/
|
||||
void truncateMeta(long newSize) throws IOException;
|
||||
|
||||
/**
|
||||
* Make the replica unreachable, perhaps by renaming it to an
|
||||
* invalid file name.
|
||||
* @throws IOException On I/O error.
|
||||
*/
|
||||
void makeUnreachable() throws IOException;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -24,6 +24,7 @@ import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_
|
|||
import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.File;
|
||||
|
@ -38,6 +39,7 @@ import java.util.concurrent.Semaphore;
|
|||
import com.google.common.base.Supplier;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -139,6 +141,11 @@ public class TestBlockScanner {
|
|||
throws Exception {
|
||||
return DFSTestUtil.getFirstBlock(dfs[nsIdx], getPath(fileIdx));
|
||||
}
|
||||
|
||||
public MaterializedReplica getMaterializedReplica(int nsIdx, int fileIdx)
|
||||
throws Exception {
|
||||
return cluster.getMaterializedReplica(0, getFileBlock(nsIdx, fileIdx));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -806,4 +813,60 @@ public class TestBlockScanner {
|
|||
info.blocksScanned = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that blocks which are in the wrong location are ignored.
|
||||
*/
|
||||
@Test(timeout=120000)
|
||||
public void testIgnoreMisplacedBlock() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
// Set a really long scan period.
|
||||
conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
|
||||
conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
|
||||
TestScanResultHandler.class.getName());
|
||||
conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
|
||||
final TestContext ctx = new TestContext(conf, 1);
|
||||
final int NUM_FILES = 4;
|
||||
ctx.createFiles(0, NUM_FILES, 5);
|
||||
MaterializedReplica unreachableReplica = ctx.getMaterializedReplica(0, 1);
|
||||
ExtendedBlock unreachableBlock = ctx.getFileBlock(0, 1);
|
||||
unreachableReplica.makeUnreachable();
|
||||
final TestScanResultHandler.Info info =
|
||||
TestScanResultHandler.getInfo(ctx.volumes.get(0));
|
||||
String storageID = ctx.volumes.get(0).getStorageID();
|
||||
synchronized (info) {
|
||||
info.sem = new Semaphore(NUM_FILES);
|
||||
info.shouldRun = true;
|
||||
info.notify();
|
||||
}
|
||||
// Scan the first 4 blocks
|
||||
LOG.info("Waiting for the blocks to be scanned.");
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
synchronized (info) {
|
||||
if (info.blocksScanned >= NUM_FILES - 1) {
|
||||
LOG.info("info = {}. blockScanned has now reached " +
|
||||
info.blocksScanned, info);
|
||||
return true;
|
||||
} else {
|
||||
LOG.info("info = {}. Waiting for blockScanned to reach " +
|
||||
(NUM_FILES - 1), info);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}, 50, 30000);
|
||||
// We should have scanned 4 blocks
|
||||
synchronized (info) {
|
||||
assertFalse(info.goodBlocks.contains(unreachableBlock));
|
||||
assertFalse(info.badBlocks.contains(unreachableBlock));
|
||||
assertEquals("Expected 3 good blocks.", 3, info.goodBlocks.size());
|
||||
info.goodBlocks.clear();
|
||||
assertEquals("Expected 3 blocksScanned", 3, info.blocksScanned);
|
||||
assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size());
|
||||
info.blocksScanned = 0;
|
||||
}
|
||||
info.sem.release(1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -170,6 +170,27 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
|
|||
truncate(metaFile, newSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void makeUnreachable() throws IOException {
|
||||
long blockId = Block.getBlockId(blockFile.getAbsolutePath());
|
||||
File origDir = blockFile.getParentFile();
|
||||
File root = origDir.getParentFile().getParentFile();
|
||||
File newDir = null;
|
||||
// Keep incrementing the block ID until the block and metadata
|
||||
// files end up in a different directory. Actually, with the
|
||||
// current replica file placement scheme, this should only ever
|
||||
// require one increment, but this is a bit of defensive coding.
|
||||
do {
|
||||
blockId++;
|
||||
newDir = DatanodeUtil.idToBlockDir(root, blockId);
|
||||
} while (origDir.equals(newDir));
|
||||
Files.createDirectories(newDir.toPath());
|
||||
Files.move(blockFile.toPath(),
|
||||
new File(newDir, blockFile.getName()).toPath());
|
||||
Files.move(metaFile.toPath(),
|
||||
new File(newDir, metaFile.getName()).toPath());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("MaterializedReplica: file=%s", blockFile);
|
||||
|
|
Loading…
Reference in New Issue