HDFS-9188. Make block corruption related tests FsDataset-agnostic. (lei)

This commit is contained in:
Lei Xu 2015-10-14 20:18:43 -07:00
parent be7a0add8b
commit c80b3a804f
15 changed files with 473 additions and 140 deletions

View File

@ -1529,6 +1529,8 @@ Release 2.8.0 - UNRELEASED
HDFS-9223. Code cleanup for DatanodeDescriptor and HeartbeatManager. (jing9) HDFS-9223. Code cleanup for DatanodeDescriptor and HeartbeatManager. (jing9)
HDFS-9188. Make block corruption related tests FsDataset-agnostic. (lei)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -51,16 +51,13 @@
import java.io.FileWriter; import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.channels.FileChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -97,6 +94,9 @@
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@ -122,7 +122,6 @@
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
@ -1822,6 +1821,28 @@ public DataNode getDataNode(int ipcPort) {
return null; return null;
} }
/**
* Returns the corresponding FsDatasetTestUtils for a DataNode.
* @param dnIdx the index of DataNode.
* @return a FsDatasetTestUtils for the given DataNode.
*/
public FsDatasetTestUtils getFsDatasetTestUtils(int dnIdx) {
Preconditions.checkArgument(dnIdx < dataNodes.size());
return FsDatasetTestUtils.Factory.getFactory(conf)
.newInstance(dataNodes.get(dnIdx).datanode);
}
/**
* Returns the corresponding FsDatasetTestUtils for a DataNode.
* @param dn a DataNode
* @return a FsDatasetTestUtils for the given DataNode.
*/
public FsDatasetTestUtils getFsDatasetTestUtils(DataNode dn) {
Preconditions.checkArgument(dn != null);
return FsDatasetTestUtils.Factory.getFactory(conf)
.newInstance(dn);
}
/** /**
* Gets the rpc port used by the NameNode, because the caller * Gets the rpc port used by the NameNode, because the caller
* supplied port is not necessarily the actual port used. * supplied port is not necessarily the actual port used.
@ -2006,11 +2027,18 @@ public synchronized void restartNameNode(int nnIndex, boolean waitActive,
private int corruptBlockOnDataNodesHelper(ExtendedBlock block, private int corruptBlockOnDataNodesHelper(ExtendedBlock block,
boolean deleteBlockFile) throws IOException { boolean deleteBlockFile) throws IOException {
int blocksCorrupted = 0; int blocksCorrupted = 0;
File[] blockFiles = getAllBlockFiles(block); for (DataNode dn : getDataNodes()) {
for (File f : blockFiles) { try {
if ((deleteBlockFile && corruptBlockByDeletingBlockFile(f)) || MaterializedReplica replica =
(!deleteBlockFile && corruptBlock(f))) { getFsDatasetTestUtils(dn).getMaterializedReplica(block);
if (deleteBlockFile) {
replica.deleteData();
} else {
replica.corruptData();
}
blocksCorrupted++; blocksCorrupted++;
} catch (ReplicaNotFoundException e) {
// Ignore.
} }
} }
return blocksCorrupted; return blocksCorrupted;
@ -2062,46 +2090,33 @@ public byte[] readBlockOnDataNodeAsBytes(int i, ExtendedBlock block)
* *
* @param i index of the datanode * @param i index of the datanode
* @param blk name of the block * @param blk name of the block
* @throws IOException on error accessing the given block or if * @throws IOException on error accessing the given block file.
* the contents of the block (on the same datanode) differ.
* @return true if a replica was corrupted, false otherwise
* Types: delete, write bad data, truncate
*/ */
public boolean corruptReplica(int i, ExtendedBlock blk) public void corruptReplica(int i, ExtendedBlock blk)
throws IOException { throws IOException {
File blockFile = getBlockFile(i, blk); getMaterializedReplica(i, blk).corruptData();
return corruptBlock(blockFile);
} }
/* /**
* Corrupt a block on a particular datanode * Corrupt a block on a particular datanode.
*
* @param dn the datanode
* @param blk name of the block
* @throws IOException on error accessing the given block file.
*/ */
public static boolean corruptBlock(File blockFile) throws IOException { public void corruptReplica(DataNode dn, ExtendedBlock blk)
if (blockFile == null || !blockFile.exists()) { throws IOException {
return false; getMaterializedReplica(dn, blk).corruptData();
}
// Corrupt replica by writing random bytes into replica
Random random = new Random();
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
FileChannel channel = raFile.getChannel();
String badString = "BADBAD";
int rand = random.nextInt((int)channel.size()/2);
raFile.seek(rand);
raFile.write(badString.getBytes());
raFile.close();
LOG.warn("Corrupting the block " + blockFile);
return true;
} }
/* /**
* Corrupt a block on a particular datanode by deleting the block file * Corrupt the metadata of a block on a datanode.
* @param i the index of the datanode
* @param blk name of the block
* @throws IOException on error accessing the given metadata file.
*/ */
public static boolean corruptBlockByDeletingBlockFile(File blockFile) public void corruptMeta(int i, ExtendedBlock blk) throws IOException {
throws IOException { getMaterializedReplica(i, blk).corruptMeta();
if (blockFile == null || !blockFile.exists()) {
return false;
}
return blockFile.delete();
} }
public boolean changeGenStampOfBlock(int dnIndex, ExtendedBlock blk, public boolean changeGenStampOfBlock(int dnIndex, ExtendedBlock blk,
@ -2758,6 +2773,32 @@ public static File getFinalizedDir(File storageDir, String bpid) {
+ DataStorage.STORAGE_DIR_FINALIZED ); + DataStorage.STORAGE_DIR_FINALIZED );
} }
/**
* Get materialized replica that can be corrupted later.
* @param i the index of DataNode.
* @param blk name of the block.
* @return a materialized replica.
* @throws ReplicaNotFoundException if the replica does not exist on the
* DataNode.
*/
public MaterializedReplica getMaterializedReplica(
int i, ExtendedBlock blk) throws ReplicaNotFoundException {
return getFsDatasetTestUtils(i).getMaterializedReplica(blk);
}
/**
* Get materialized replica that can be corrupted later.
* @param dn the index of DataNode.
* @param blk name of the block.
* @return a materialized replica.
* @throws ReplicaNotFoundException if the replica does not exist on the
* DataNode.
*/
public MaterializedReplica getMaterializedReplica(
DataNode dn, ExtendedBlock blk) throws ReplicaNotFoundException {
return getFsDatasetTestUtils(dn).getMaterializedReplica(blk);
}
/** /**
* Get file correpsonding to a block * Get file correpsonding to a block
* @param storageDir storage directory * @param storageDir storage directory

View File

@ -18,11 +18,9 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.io.RandomAccessFile;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -39,7 +37,6 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck; import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
import org.apache.hadoop.hdfs.tools.DFSck; import org.apache.hadoop.hdfs.tools.DFSck;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
@ -217,7 +214,7 @@ private void createAFileWithCorruptedBlockReplicas(Path filePath, short repl,
for (int i = 0; i < corruptBlockCount; i++) { for (int i = 0; i < corruptBlockCount; i++) {
DatanodeInfo dninfo = datanodeinfos[i]; DatanodeInfo dninfo = datanodeinfos[i];
final DataNode dn = cluster.getDataNode(dninfo.getIpcPort()); final DataNode dn = cluster.getDataNode(dninfo.getIpcPort());
corruptBlock(block, dn); cluster.corruptReplica(dn, block);
LOG.debug("Corrupted block " + block.getBlockName() + " on data node " LOG.debug("Corrupted block " + block.getBlockName() + " on data node "
+ dninfo); + dninfo);
@ -292,30 +289,6 @@ private void dfsClientReadFileFromPosition(Path corruptedFile)
} }
} }
/**
* Corrupt a block on a data node. Replace the block file content with content
* of 1, 2, ...BLOCK_SIZE.
*
* @param block
* the ExtendedBlock to be corrupted
* @param dn
* the data node where the block needs to be corrupted
* @throws FileNotFoundException
* @throws IOException
*/
private static void corruptBlock(final ExtendedBlock block, final DataNode dn)
throws FileNotFoundException, IOException {
final File f = DataNodeTestUtils.getBlockFile(
dn, block.getBlockPoolId(), block.getLocalBlock());
final RandomAccessFile raFile = new RandomAccessFile(f, "rw");
final byte[] bytes = new byte[(int) BLOCK_SIZE];
for (int i = 0; i < BLOCK_SIZE; i++) {
bytes[i] = (byte) (i);
}
raFile.write(bytes);
raFile.close();
}
private static void verifyFsckHealth(String expected) throws Exception { private static void verifyFsckHealth(String expected) throws Exception {
// Fsck health has error code 0. // Fsck health has error code 0.
// Make sure filesystem is in healthy state // Make sure filesystem is in healthy state

View File

@ -39,8 +39,8 @@
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -1507,32 +1507,31 @@ public void testDFSShell() throws IOException {
} }
} }
static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException { private static List<MaterializedReplica> getMaterializedReplicas(
List<File> files = new ArrayList<File>(); MiniDFSCluster cluster) throws IOException {
List<DataNode> datanodes = cluster.getDataNodes(); List<MaterializedReplica> replicas = new ArrayList<>();
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
List<Map<DatanodeStorage, BlockListAsLongs>> blocks = cluster.getAllBlockReports(poolId); List<Map<DatanodeStorage, BlockListAsLongs>> blocks =
cluster.getAllBlockReports(poolId);
for(int i = 0; i < blocks.size(); i++) { for(int i = 0; i < blocks.size(); i++) {
DataNode dn = datanodes.get(i);
Map<DatanodeStorage, BlockListAsLongs> map = blocks.get(i); Map<DatanodeStorage, BlockListAsLongs> map = blocks.get(i);
for(Map.Entry<DatanodeStorage, BlockListAsLongs> e : map.entrySet()) { for(Map.Entry<DatanodeStorage, BlockListAsLongs> e : map.entrySet()) {
for(Block b : e.getValue()) { for(Block b : e.getValue()) {
files.add(DataNodeTestUtils.getFile(dn, poolId, b.getBlockId())); replicas.add(cluster.getMaterializedReplica(i,
new ExtendedBlock(poolId, b)));
} }
} }
} }
return files; return replicas;
} }
static void corrupt(List<File> files) throws IOException { private static void corrupt(
for(File f : files) { List<MaterializedReplica> replicas, String content) throws IOException {
StringBuilder content = new StringBuilder(DFSTestUtil.readFile(f)); StringBuilder sb = new StringBuilder(content);
char c = content.charAt(0); char c = content.charAt(0);
content.setCharAt(0, ++c); sb.setCharAt(0, ++c);
PrintWriter out = new PrintWriter(f); for(MaterializedReplica replica : replicas) {
out.print(content); replica.corruptData(sb.toString().getBytes("UTF8"));
out.flush();
out.close();
} }
} }
@ -1636,7 +1635,7 @@ public String run(int exitcode, String... options) throws IOException {
assertEquals(localfcontent, runner.run(0, "-ignoreCrc")); assertEquals(localfcontent, runner.run(0, "-ignoreCrc"));
// find block files to modify later // find block files to modify later
List<File> files = getBlockFiles(cluster); List<MaterializedReplica> replicas = getMaterializedReplicas(cluster);
// Shut down cluster and then corrupt the block files by overwriting a // Shut down cluster and then corrupt the block files by overwriting a
// portion with junk data. We must shut down the cluster so that threads // portion with junk data. We must shut down the cluster so that threads
@ -1649,8 +1648,8 @@ public String run(int exitcode, String... options) throws IOException {
dfs.close(); dfs.close();
cluster.shutdown(); cluster.shutdown();
show("files=" + files); show("replicas=" + replicas);
corrupt(files); corrupt(replicas, localfcontent);
// Start the cluster again, but do not reformat, so prior files remain. // Start the cluster again, but do not reformat, so prior files remain.
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(false) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(false)

View File

@ -21,20 +21,18 @@
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -315,13 +313,7 @@ private void testTC7(boolean appendToNewBlock) throws Exception {
DatanodeInfo[] datanodeinfos = lb.getLocations(); DatanodeInfo[] datanodeinfos = lb.getLocations();
assertEquals(repl, datanodeinfos.length); assertEquals(repl, datanodeinfos.length);
final DataNode dn = cluster.getDataNode(datanodeinfos[0].getIpcPort()); final DataNode dn = cluster.getDataNode(datanodeinfos[0].getIpcPort());
final File f = DataNodeTestUtils.getBlockFile( cluster.getMaterializedReplica(dn, blk).truncateData(0);
dn, blk.getBlockPoolId(), blk.getLocalBlock());
final RandomAccessFile raf = new RandomAccessFile(f, "rw");
AppendTestUtil.LOG.info("dn=" + dn + ", blk=" + blk + " (length=" + blk.getNumBytes() + ")");
assertEquals(len1, raf.length());
raf.setLength(0);
raf.close();
//c. Open file in "append mode". Append a new block worth of data. Close file. //c. Open file in "append mode". Append a new block worth of data. Close file.
final int len2 = (int)BLOCK_SIZE; final int len2 = (int)BLOCK_SIZE;

View File

@ -81,7 +81,7 @@ public void testMissingBlocksAlert()
// Corrupt the block // Corrupt the block
ExtendedBlock block = DFSTestUtil.getFirstBlock(dfs, corruptFile); ExtendedBlock block = DFSTestUtil.getFirstBlock(dfs, corruptFile);
assertTrue(cluster.corruptReplica(0, block)); cluster.corruptReplica(0, block);
// read the file so that the corrupt block is reported to NN // read the file so that the corrupt block is reported to NN
FSDataInputStream in = dfs.open(corruptFile); FSDataInputStream in = dfs.open(corruptFile);
@ -126,7 +126,7 @@ public void testMissingBlocksAlert()
DFSTestUtil.createFile(dfs, replOneFile, fileLen, (short)1, 0); DFSTestUtil.createFile(dfs, replOneFile, fileLen, (short)1, 0);
ExtendedBlock replOneBlock = DFSTestUtil.getFirstBlock( ExtendedBlock replOneBlock = DFSTestUtil.getFirstBlock(
dfs, replOneFile); dfs, replOneFile);
assertTrue(cluster.corruptReplica(0, replOneBlock)); cluster.corruptReplica(0, replOneBlock);
// read the file so that the corrupt block is reported to NN // read the file so that the corrupt block is reported to NN
in = dfs.open(replOneFile); in = dfs.open(replOneFile);

View File

@ -210,7 +210,7 @@ public void testCorruptBlockRereplicatedAcrossRacks() throws Exception {
// Corrupt a replica of the block // Corrupt a replica of the block
int dnToCorrupt = DFSTestUtil.firstDnWithBlock(cluster, b); int dnToCorrupt = DFSTestUtil.firstDnWithBlock(cluster, b);
assertTrue(cluster.corruptReplica(dnToCorrupt, b)); cluster.corruptReplica(dnToCorrupt, b);
// Restart the datanode so blocks are re-scanned, and the corrupt // Restart the datanode so blocks are re-scanned, and the corrupt
// block is detected. // block is detected.

View File

@ -67,7 +67,7 @@ public void testProcesOverReplicateBlock() throws Exception {
// corrupt the block on datanode 0 // corrupt the block on datanode 0
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName);
assertTrue(cluster.corruptReplica(0, block)); cluster.corruptReplica(0, block);
DataNodeProperties dnProps = cluster.stopDataNode(0); DataNodeProperties dnProps = cluster.stopDataNode(0);
// remove block scanner log to trigger block scanning // remove block scanner log to trigger block scanning
File scanCursor = new File(new File(MiniDFSCluster.getFinalizedDir( File scanCursor = new File(new File(MiniDFSCluster.getFinalizedDir(

View File

@ -22,7 +22,6 @@
import static org.junit.Assume.assumeTrue; import static org.junit.Assume.assumeTrue;
import java.io.Closeable; import java.io.Closeable;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
@ -37,10 +36,8 @@
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.TestDNFencing.RandomDeleterPolicy; import org.apache.hadoop.hdfs.server.namenode.ha.TestDNFencing.RandomDeleterPolicy;
@ -91,19 +88,14 @@ public void testBlockInvalidationWhenRBWReplicaMissedInDN()
out.writeBytes("HDFS-3157: " + testPath); out.writeBytes("HDFS-3157: " + testPath);
out.hsync(); out.hsync();
cluster.startDataNodes(conf, 1, true, null, null, null); cluster.startDataNodes(conf, 1, true, null, null, null);
String bpid = namesystem.getBlockPoolId();
ExtendedBlock blk = DFSTestUtil.getFirstBlock(fs, testPath); ExtendedBlock blk = DFSTestUtil.getFirstBlock(fs, testPath);
Block block = blk.getLocalBlock();
DataNode dn = cluster.getDataNodes().get(0);
// Delete partial block and its meta information from the RBW folder // Delete partial block and its meta information from the RBW folder
// of first datanode. // of first datanode.
File blockFile = DataNodeTestUtils.getBlockFile(dn, bpid, block); MaterializedReplica replica = cluster.getMaterializedReplica(0, blk);
File metaFile = DataNodeTestUtils.getMetaFile(dn, bpid, block);
assertTrue("Could not delete the block file from the RBW folder", replica.deleteData();
blockFile.delete()); replica.deleteMeta();
assertTrue("Could not delete the block meta file from the RBW folder",
metaFile.delete());
out.close(); out.close();

View File

@ -158,20 +158,6 @@ public static FsDatasetSpi<?> getFSDataset(DataNode dn) {
return dn.getFSDataset(); return dn.getFSDataset();
} }
public static File getFile(DataNode dn, String bpid, long bid) {
return FsDatasetTestUtil.getFile(dn.getFSDataset(), bpid, bid);
}
public static File getBlockFile(DataNode dn, String bpid, Block b
) throws IOException {
return FsDatasetTestUtil.getBlockFile(dn.getFSDataset(), bpid, b);
}
public static File getMetaFile(DataNode dn, String bpid, Block b)
throws IOException {
return FsDatasetTestUtil.getMetaFile(dn.getFSDataset(), bpid, b);
}
public static long getPendingAsyncDeletions(DataNode dn) { public static long getPendingAsyncDeletions(DataNode dn) {
return FsDatasetTestUtil.getPendingAsyncDeletions(dn.getFSDataset()); return FsDatasetTestUtil.getPendingAsyncDeletions(dn.getFSDataset());
} }

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImplTestUtils;
/**
* A factory for creating {@link FsDatasetImplTestUtils} objects.
*/
public final class FsDatasetImplTestUtilsFactory
extends FsDatasetTestUtils.Factory<FsDatasetTestUtils> {
@Override
public FsDatasetTestUtils newInstance(DataNode datanode) {
return new FsDatasetImplTestUtils(datanode);
}
}

View File

@ -0,0 +1,140 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.FileNotFoundException;
import java.io.IOException;
/**
* Provide block access for FsDataset white box tests.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface FsDatasetTestUtils {
abstract class Factory<D extends FsDatasetTestUtils> {
/**
* By default, it returns FsDatasetImplTestUtilsFactory.
*
* @return The configured Factory.
*/
public static Factory<?> getFactory(Configuration conf) {
String className = conf.get(
DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
FsDatasetFactory.class.getName());
Preconditions.checkState(className.contains("Factory"));
className = className.replaceFirst("(\\$)?Factory$", "TestUtilsFactory");
final Class<? extends Factory> clazz = conf.getClass(
className,
FsDatasetImplTestUtilsFactory.class,
Factory.class);
return ReflectionUtils.newInstance(clazz, conf);
}
/**
* Create a new instance of FsDatasetTestUtils.
*/
public abstract D newInstance(DataNode datanode);
/**
* @return True for SimulatedFsDataset
*/
public boolean isSimulated() {
return false;
}
}
/**
* A replica to be corrupted.
*
* It is safe to corrupt this replica even if the MiniDFSCluster is shutdown.
*/
interface MaterializedReplica {
/**
* Corrupt the block file of the replica.
* @throws FileNotFoundException if the block file does not exist.
* @throws IOException if I/O error.
*/
void corruptData() throws IOException;
/**
* Corrupt the block file with the given content.
* @param newContent the new content written to the block file.
* @throws FileNotFoundException if the block file does not exist.
* @throws IOException if I/O error.
*/
void corruptData(byte[] newContent) throws IOException;
/**
* Truncate the block file of the replica to the newSize.
* @param newSize the new size of the block file.
* @throws FileNotFoundException if the block file does not exist.
* @throws IOException if I/O error.
*/
void truncateData(long newSize) throws IOException;
/**
* Delete the block file of the replica.
* @throws FileNotFoundException if the block file does not exist.
* @throws IOException if I/O error.
*/
void deleteData() throws IOException;
/**
* Corrupt the metadata file of the replica.
* @throws FileNotFoundException if the block file does not exist.
* @throws IOException if I/O error.
*/
void corruptMeta() throws IOException;
/**
* Delete the metadata file of the replcia.
* @throws FileNotFoundException if the block file does not exist.
* @throws IOException I/O error.
*/
void deleteMeta() throws IOException;
/**
* Truncate the metadata file of the replica to the newSize.
* @throws FileNotFoundException if the block file does not exist.
* @throws IOException I/O error.
*/
void truncateMeta(long newSize) throws IOException;
}
/**
* Get a materialized replica to corrupt its block / crc later.
* @param block the block of this replica begone to.
* @return a replica to corrupt. Return null if the replica does not exist
* in this dataset.
* @throws ReplicaNotFoundException if the replica does not exists on the
* dataset.
*/
MaterializedReplica getMaterializedReplica(ExtendedBlock block)
throws ReplicaNotFoundException;
}

View File

@ -0,0 +1,179 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.util.Random;
/**
* Test-related utilities to access blocks in {@link FsDatasetImpl}.
*/
@InterfaceStability.Unstable
@InterfaceAudience.Private
public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
private static final Log LOG =
LogFactory.getLog(FsDatasetImplTestUtils.class);
private final FsDatasetImpl dataset;
/**
* A reference to the replica that is used to corrupt block / meta later.
*/
private static class FsDatasetImplMaterializedReplica
implements MaterializedReplica {
/** Block file of the replica. */
private final File blockFile;
private final File metaFile;
/** Check the existence of the file. */
private static void checkFile(File file) throws FileNotFoundException {
if (file == null || !file.exists()) {
throw new FileNotFoundException(
"The block file or metadata file " + file + " does not exist.");
}
}
/** Corrupt a block / crc file by truncating it to a newSize */
private static void truncate(File file, long newSize)
throws IOException {
Preconditions.checkArgument(newSize >= 0);
checkFile(file);
try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) {
raf.setLength(newSize);
}
}
/** Corrupt a block / crc file by deleting it. */
private static void delete(File file) throws IOException {
checkFile(file);
Files.delete(file.toPath());
}
FsDatasetImplMaterializedReplica(File blockFile, File metaFile) {
this.blockFile = blockFile;
this.metaFile = metaFile;
}
@Override
public void corruptData() throws IOException {
checkFile(blockFile);
LOG.info("Corrupting block file: " + blockFile);
final int BUF_SIZE = 32;
byte[] buf = new byte[BUF_SIZE];
try (RandomAccessFile raf = new RandomAccessFile(blockFile, "rw")) {
int nread = raf.read(buf);
for (int i = 0; i < nread; i++) {
buf[i]++;
}
raf.seek(0);
raf.write(buf);
}
}
@Override
public void corruptData(byte[] newContent) throws IOException {
checkFile(blockFile);
LOG.info("Corrupting block file with new content: " + blockFile);
try (RandomAccessFile raf = new RandomAccessFile(blockFile, "rw")) {
raf.write(newContent);
}
}
@Override
public void truncateData(long newSize) throws IOException {
LOG.info("Truncating block file: " + blockFile);
truncate(blockFile, newSize);
}
@Override
public void deleteData() throws IOException {
LOG.info("Deleting block file: " + blockFile);
delete(blockFile);
}
@Override
public void corruptMeta() throws IOException {
checkFile(metaFile);
LOG.info("Corrupting meta file: " + metaFile);
Random random = new Random();
try (RandomAccessFile raf = new RandomAccessFile(metaFile, "rw")) {
FileChannel channel = raf.getChannel();
int offset = random.nextInt((int)channel.size() / 2);
raf.seek(offset);
raf.write("BADBAD".getBytes());
}
}
@Override
public void deleteMeta() throws IOException {
LOG.info("Deleting metadata file: " + metaFile);
delete(metaFile);
}
@Override
public void truncateMeta(long newSize) throws IOException {
LOG.info("Truncating metadata file: " + metaFile);
truncate(metaFile, newSize);
}
@Override
public String toString() {
return String.format("MaterializedReplica: file=%s", blockFile);
}
}
public FsDatasetImplTestUtils(DataNode datanode) {
Preconditions.checkArgument(
datanode.getFSDataset() instanceof FsDatasetImpl);
dataset = (FsDatasetImpl) datanode.getFSDataset();
}
/**
* Return a materialized replica from the FsDatasetImpl.
*/
@Override
public MaterializedReplica getMaterializedReplica(ExtendedBlock block)
throws ReplicaNotFoundException {
File blockFile;
try {
blockFile = dataset.getBlockFile(
block.getBlockPoolId(), block.getBlockId());
} catch (IOException e) {
LOG.error("Block file for " + block + " does not existed:", e);
throw new ReplicaNotFoundException(block);
}
File metaFile = FsDatasetUtil.getMetaFile(
blockFile, block.getGenerationStamp());
return new FsDatasetImplMaterializedReplica(blockFile, metaFile);
}
}

View File

@ -19,12 +19,9 @@
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.ClientContext; import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
@ -264,9 +261,7 @@ public void doShortCircuitReadMetaFileCorruptionTest() throws IOException,
// Corrupt the lazy-persisted checksum file, and verify that checksum // Corrupt the lazy-persisted checksum file, and verify that checksum
// verification catches it. // verification catches it.
ensureFileReplicasOnStorageType(path1, DEFAULT); ensureFileReplicasOnStorageType(path1, DEFAULT);
File metaFile = cluster.getBlockMetadataFile(0, cluster.corruptMeta(0, DFSTestUtil.getFirstBlock(fs, path1));
DFSTestUtil.getFirstBlock(fs, path1));
MiniDFSCluster.corruptBlock(metaFile);
exception.expect(ChecksumException.class); exception.expect(ChecksumException.class);
DFSTestUtil.readFileBuffer(fs, path1); DFSTestUtil.readFileBuffer(fs, path1);
} }

View File

@ -268,10 +268,12 @@ private static NumberReplicas countReplicas(final FSNamesystem namesystem, Exten
private void corruptBlock(MiniDFSCluster cluster, FileSystem fs, final Path fileName, private void corruptBlock(MiniDFSCluster cluster, FileSystem fs, final Path fileName,
int dnIndex, ExtendedBlock block) throws IOException { int dnIndex, ExtendedBlock block) throws IOException {
// corrupt the block on datanode dnIndex // Truncate the block on the first datanode that has not been corrupted,
// so that directory scanner can discover the corruption from file size
// change.
// the indexes change once the nodes are restarted. // the indexes change once the nodes are restarted.
// But the datadirectory will not change // But the datadirectory will not change
assertTrue(cluster.corruptReplica(dnIndex, block)); cluster.getMaterializedReplica(0, block).truncateData(10);
// Run directory scanner to update the DN's volume map // Run directory scanner to update the DN's volume map
DataNodeTestUtils.runDirectoryScanner(cluster.getDataNodes().get(0)); DataNodeTestUtils.runDirectoryScanner(cluster.getDataNodes().get(0));