diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c5b5326f5be..0ecda4b3896 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -173,6 +173,8 @@ Release 2.6.0 - UNRELEASED HDFS-6634. inotify in HDFS. (James Thomas via wang) + HDFS-4257. The ReplaceDatanodeOnFailure policies could have a forgiving + option (szetszwo via cmccabe) OPTIMIZATIONS diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index f1a9705ad9c..bf7f5ba7c8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -53,6 +53,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true; public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy"; public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT"; + public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY = "dfs.client.block.write.replace-datanode-on-failure.best-effort"; + public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_DEFAULT = false; public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity"; public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16; public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index f7c1e3be941..5526cb1af63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -1178,7 +1178,17 @@ public class DFSOutputStream extends FSOutputSummer // Check if replace-datanode policy is satisfied. if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication, nodes, isAppend, isHflushed)) { - addDatanode2ExistingPipeline(); + try { + addDatanode2ExistingPipeline(); + } catch(IOException ioe) { + if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) { + throw ioe; + } + DFSClient.LOG.warn("Failed to replace datanode." + + " Continue with the remaining datanodes since " + + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY + + " is set to true.", ioe); + } } // get a new generation stamp and an access token diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java index 318455424aa..0f2c1abdf15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java @@ -29,26 +29,90 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public enum ReplaceDatanodeOnFailure { - /** The feature is disabled in the entire site. */ - DISABLE, - /** Never add a new datanode. */ - NEVER, - /** - * DEFAULT policy: - * Let r be the replication number. - * Let n be the number of existing datanodes. - * Add a new datanode only if r >= 3 and either - * (1) floor(r/2) >= n; or - * (2) r > n and the block is hflushed/appended. - */ - DEFAULT, - /** Always add a new datanode when an existing datanode is removed. */ - ALWAYS; +public class ReplaceDatanodeOnFailure { + /** The replacement policies */ + public enum Policy { + /** The feature is disabled in the entire site. */ + DISABLE(Condition.FALSE), + /** Never add a new datanode. */ + NEVER(Condition.FALSE), + /** @see ReplaceDatanodeOnFailure.Condition#DEFAULT */ + DEFAULT(Condition.DEFAULT), + /** Always add a new datanode when an existing datanode is removed. */ + ALWAYS(Condition.TRUE); + + private final Condition condition; + + private Policy(Condition condition) { + this.condition = condition; + } + + Condition getCondition() { + return condition; + } + } + + /** Datanode replacement condition */ + private static interface Condition { + /** Return true unconditionally. */ + static final Condition TRUE = new Condition() { + @Override + public boolean satisfy(short replication, DatanodeInfo[] existings, + int nExistings, boolean isAppend, boolean isHflushed) { + return true; + } + }; + + /** Return false unconditionally. */ + static final Condition FALSE = new Condition() { + @Override + public boolean satisfy(short replication, DatanodeInfo[] existings, + int nExistings, boolean isAppend, boolean isHflushed) { + return false; + } + }; + + /** + * DEFAULT condition: + * Let r be the replication number. + * Let n be the number of existing datanodes. + * Add a new datanode only if r >= 3 and either + * (1) floor(r/2) >= n; or + * (2) r > n and the block is hflushed/appended. + */ + static final Condition DEFAULT = new Condition() { + @Override + public boolean satisfy(final short replication, + final DatanodeInfo[] existings, final int n, final boolean isAppend, + final boolean isHflushed) { + if (replication < 3) { + return false; + } else { + if (n <= (replication/2)) { + return true; + } else { + return isAppend || isHflushed; + } + } + } + }; + + /** Is the condition satisfied? */ + public boolean satisfy(short replication, DatanodeInfo[] existings, + int nExistings, boolean isAppend, boolean isHflushed); + } + + private final Policy policy; + private final boolean bestEffort; + + public ReplaceDatanodeOnFailure(Policy policy, boolean bestEffort) { + this.policy = policy; + this.bestEffort = bestEffort; + } /** Check if the feature is enabled. */ public void checkEnabled() { - if (this == DISABLE) { + if (policy == Policy.DISABLE) { throw new UnsupportedOperationException( "This feature is disabled. Please refer to " + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY @@ -56,7 +120,20 @@ public enum ReplaceDatanodeOnFailure { } } - /** Is the policy satisfied? */ + /** + * Best effort means that the client will try to replace the failed datanode + * (provided that the policy is satisfied), however, it will continue the + * write operation in case that the datanode replacement also fails. + * + * @return Suppose the datanode replacement fails. + * false: An exception should be thrown so that the write will fail. + * true : The write should be resumed with the remaining datandoes. + */ + public boolean isBestEffort() { + return bestEffort; + } + + /** Does it need a replacement according to the policy? */ public boolean satisfy( final short replication, final DatanodeInfo[] existings, final boolean isAppend, final boolean isHflushed) { @@ -64,40 +141,42 @@ public enum ReplaceDatanodeOnFailure { if (n == 0 || n >= replication) { //don't need to add datanode for any policy. return false; - } else if (this == DISABLE || this == NEVER) { - return false; - } else if (this == ALWAYS) { - return true; } else { - //DEFAULT - if (replication < 3) { - return false; - } else { - if (n <= (replication/2)) { - return true; - } else { - return isAppend || isHflushed; - } - } + return policy.getCondition().satisfy( + replication, existings, n, isAppend, isHflushed); } } + + @Override + public String toString() { + return policy.toString(); + } /** Get the setting from configuration. */ public static ReplaceDatanodeOnFailure get(final Configuration conf) { + final Policy policy = getPolicy(conf); + final boolean bestEffort = conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY, + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_DEFAULT); + + return new ReplaceDatanodeOnFailure(policy, bestEffort); + } + + private static Policy getPolicy(final Configuration conf) { final boolean enabled = conf.getBoolean( DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY, DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT); if (!enabled) { - return DISABLE; + return Policy.DISABLE; } final String policy = conf.get( DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY, DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT); - for(int i = 1; i < values().length; i++) { - final ReplaceDatanodeOnFailure rdof = values()[i]; - if (rdof.name().equalsIgnoreCase(policy)) { - return rdof; + for(int i = 1; i < Policy.values().length; i++) { + final Policy p = Policy.values()[i]; + if (p.name().equalsIgnoreCase(policy)) { + return p; } } throw new HadoopIllegalArgumentException("Illegal configuration value for " @@ -106,12 +185,16 @@ public enum ReplaceDatanodeOnFailure { } /** Write the setting to configuration. */ - public void write(final Configuration conf) { + public static void write(final Policy policy, + final boolean bestEffort, final Configuration conf) { conf.setBoolean( DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY, - this != DISABLE); + policy != Policy.DISABLE); conf.set( DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY, - name()); + policy.name()); + conf.setBoolean( + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY, + bestEffort); } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 1cebdd7666c..76336eb50f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -523,6 +523,28 @@ + + dfs.client.block.write.replace-datanode-on-failure.best-effort + false + + This property is used only if the value of + dfs.client.block.write.replace-datanode-on-failure.enable is true. + + Best effort means that the client will try to replace a failed datanode + in write pipeline (provided that the policy is satisfied), however, it + continues the write operation in case that the datanode replacement also + fails. + + Suppose the datanode replacement fails. + false: An exception should be thrown so that the write will fail. + true : The write should be resumed with the remaining datandoes. + + Note that setting this property to true allows writing to a pipeline + with a smaller number of datanodes. As a result, it increases the + probability of data loss. + + + dfs.blockreport.intervalMsec 21600000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java index a2908674c5c..91614175f42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure; +import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure.Policy; import org.apache.hadoop.io.IOUtils; import org.apache.log4j.Level; import org.junit.Assert; @@ -55,7 +56,8 @@ public class TestReplaceDatanodeOnFailure { /** Test DEFAULT ReplaceDatanodeOnFailure policy. */ @Test public void testDefaultPolicy() throws Exception { - final ReplaceDatanodeOnFailure p = ReplaceDatanodeOnFailure.DEFAULT; + final Configuration conf = new HdfsConfiguration(); + final ReplaceDatanodeOnFailure p = ReplaceDatanodeOnFailure.get(conf); final DatanodeInfo[] infos = new DatanodeInfo[5]; final DatanodeInfo[][] datanodes = new DatanodeInfo[infos.length + 1][]; @@ -114,7 +116,7 @@ public class TestReplaceDatanodeOnFailure { final Configuration conf = new HdfsConfiguration(); //always replace a datanode - ReplaceDatanodeOnFailure.ALWAYS.write(conf); + ReplaceDatanodeOnFailure.write(Policy.ALWAYS, true, conf); final String[] racks = new String[REPLICATION]; Arrays.fill(racks, RACK0); @@ -240,8 +242,6 @@ public class TestReplaceDatanodeOnFailure { final Configuration conf = new HdfsConfiguration(); final short REPLICATION = (short)3; - Assert.assertEquals(ReplaceDatanodeOnFailure.DEFAULT, ReplaceDatanodeOnFailure.get(conf)); - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf ).numDataNodes(1).build(); @@ -286,4 +286,41 @@ public class TestReplaceDatanodeOnFailure { if (cluster != null) {cluster.shutdown();} } } + + @Test + public void testBestEffort() throws Exception { + final Configuration conf = new HdfsConfiguration(); + + //always replace a datanode but do not throw exception + ReplaceDatanodeOnFailure.write(Policy.ALWAYS, true, conf); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf + ).numDataNodes(1).build(); + + try { + final DistributedFileSystem fs = cluster.getFileSystem(); + final Path f = new Path(DIR, "testIgnoreReplaceFailure"); + + final byte[] bytes = new byte[1000]; + { + LOG.info("write " + bytes.length + " bytes to " + f); + final FSDataOutputStream out = fs.create(f, REPLICATION); + out.write(bytes); + out.close(); + + final FileStatus status = fs.getFileStatus(f); + Assert.assertEquals(REPLICATION, status.getReplication()); + Assert.assertEquals(bytes.length, status.getLen()); + } + + { + LOG.info("append another " + bytes.length + " bytes to " + f); + final FSDataOutputStream out = fs.append(f); + out.write(bytes); + out.close(); + } + } finally { + if (cluster != null) {cluster.shutdown();} + } + } }