HDFS-15200. Delete Corrupt Replica Immediately Irrespective of Replicas On Stale Storage. Contributed by Ayush Saxena.
This commit is contained in:
parent
40285be1d5
commit
079d3ea6ac
|
@ -275,6 +275,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
= "dfs.namenode.blockreport.queue.size";
|
= "dfs.namenode.blockreport.queue.size";
|
||||||
public static final int DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT
|
public static final int DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT
|
||||||
= 1024;
|
= 1024;
|
||||||
|
|
||||||
|
public static final String
|
||||||
|
DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED =
|
||||||
|
"dfs.namenode.corrupt.block.delete.immediately.enabled";
|
||||||
|
public static final boolean
|
||||||
|
DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT = true;
|
||||||
|
|
||||||
public static final String DFS_WEBHDFS_AUTHENTICATION_FILTER_KEY = "dfs.web.authentication.filter";
|
public static final String DFS_WEBHDFS_AUTHENTICATION_FILTER_KEY = "dfs.web.authentication.filter";
|
||||||
/* Phrased as below to avoid javac inlining as a constant, to match the behavior when
|
/* Phrased as below to avoid javac inlining as a constant, to match the behavior when
|
||||||
this was AuthFilter.class.getName(). Note that if you change the import for AuthFilter, you
|
this was AuthFilter.class.getName(). Note that if you change the import for AuthFilter, you
|
||||||
|
|
|
@ -455,6 +455,11 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
* from ENTERING_MAINTENANCE to IN_MAINTENANCE.
|
* from ENTERING_MAINTENANCE to IN_MAINTENANCE.
|
||||||
*/
|
*/
|
||||||
private final short minReplicationToBeInMaintenance;
|
private final short minReplicationToBeInMaintenance;
|
||||||
|
/**
|
||||||
|
* Whether to delete corrupt replica immediately irrespective of other
|
||||||
|
* replicas available on stale storages.
|
||||||
|
*/
|
||||||
|
private final boolean deleteCorruptReplicaImmediately;
|
||||||
|
|
||||||
/** Storages accessible from multiple DNs. */
|
/** Storages accessible from multiple DNs. */
|
||||||
private final ProvidedStorageMap providedStorageMap;
|
private final ProvidedStorageMap providedStorageMap;
|
||||||
|
@ -607,6 +612,10 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT);
|
DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT);
|
||||||
blockReportThread = new BlockReportProcessingThread(queueSize);
|
blockReportThread = new BlockReportProcessingThread(queueSize);
|
||||||
|
|
||||||
|
this.deleteCorruptReplicaImmediately =
|
||||||
|
conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
|
||||||
|
DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT);
|
||||||
|
|
||||||
LOG.info("defaultReplication = {}", defaultReplication);
|
LOG.info("defaultReplication = {}", defaultReplication);
|
||||||
LOG.info("maxReplication = {}", maxReplication);
|
LOG.info("maxReplication = {}", maxReplication);
|
||||||
LOG.info("minReplication = {}", minReplication);
|
LOG.info("minReplication = {}", minReplication);
|
||||||
|
@ -1864,7 +1873,7 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check how many copies we have of the block
|
// Check how many copies we have of the block
|
||||||
if (nr.replicasOnStaleNodes() > 0) {
|
if (nr.replicasOnStaleNodes() > 0 && !deleteCorruptReplicaImmediately) {
|
||||||
blockLog.debug("BLOCK* invalidateBlocks: postponing " +
|
blockLog.debug("BLOCK* invalidateBlocks: postponing " +
|
||||||
"invalidation of {} on {} because {} replica(s) are located on " +
|
"invalidation of {} on {} because {} replica(s) are located on " +
|
||||||
"nodes with potentially out-of-date block reports", b, dn,
|
"nodes with potentially out-of-date block reports", b, dn,
|
||||||
|
|
|
@ -5426,6 +5426,15 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.namenode.corrupt.block.delete.immediately.enabled</name>
|
||||||
|
<value>true</value>
|
||||||
|
<description>
|
||||||
|
Whether the corrupt replicas should be deleted immediately, irrespective
|
||||||
|
of other replicas on stale storages..
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.journalnode.edits.dir.perm</name>
|
<name>dfs.journalnode.edits.dir.perm</name>
|
||||||
<value>700</value>
|
<value>700</value>
|
||||||
|
|
|
@ -21,7 +21,9 @@ import com.google.common.base.Joiner;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.LinkedListMultimap;
|
import com.google.common.collect.LinkedListMultimap;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
|
@ -501,7 +503,41 @@ public class TestBlockManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testDeleteCorruptReplicaWithStatleStorages() throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
|
||||||
|
MIN_REPLICATION, 2);
|
||||||
|
Path file = new Path("/test-file");
|
||||||
|
MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
|
||||||
|
blockManager.getDatanodeManager().markAllDatanodesStale();
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
FSDataOutputStream out = fs.create(file);
|
||||||
|
for (int i = 0; i < 1024 * 1024 * 1; i++) {
|
||||||
|
out.write(i);
|
||||||
|
}
|
||||||
|
out.hflush();
|
||||||
|
MiniDFSCluster.DataNodeProperties datanode = cluster.stopDataNode(0);
|
||||||
|
for (int i = 0; i < 1024 * 1024 * 1; i++) {
|
||||||
|
out.write(i);
|
||||||
|
}
|
||||||
|
out.close();
|
||||||
|
cluster.restartDataNode(datanode);
|
||||||
|
cluster.triggerBlockReports();
|
||||||
|
DataNodeTestUtils.triggerBlockReport(datanode.getDatanode());
|
||||||
|
assertEquals(0, blockManager.getCorruptBlocks());
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tell the block manager that replication is completed for the given
|
* Tell the block manager that replication is completed for the given
|
||||||
* pipeline.
|
* pipeline.
|
||||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests corruption of replicas in case of failover.
|
* Tests corruption of replicas in case of failover.
|
||||||
*/
|
*/
|
||||||
|
@ -35,6 +37,8 @@ public class TestCorruptionWithFailover {
|
||||||
@Test
|
@Test
|
||||||
public void testCorruptReplicaAfterFailover() throws Exception {
|
public void testCorruptReplicaAfterFailover() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
|
||||||
|
false);
|
||||||
// Enable data to be written, to less replicas in case of pipeline failure.
|
// Enable data to be written, to less replicas in case of pipeline failure.
|
||||||
conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
|
conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
|
||||||
MIN_REPLICATION, 2);
|
MIN_REPLICATION, 2);
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED;
|
||||||
import static org.apache.hadoop.hdfs.MiniDFSCluster.HDFS_MINIDFS_BASEDIR;
|
import static org.apache.hadoop.hdfs.MiniDFSCluster.HDFS_MINIDFS_BASEDIR;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
@ -188,6 +189,8 @@ public class TestFsck {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
|
conf.setBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
|
||||||
|
false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
|
Loading…
Reference in New Issue