HDFS-4257. The ReplaceDatanodeOnFailure policies could have a forgiving option. Contributed by szetszwo.

This commit is contained in:
Colin Patrick Mccabe 2014-09-02 15:14:33 -07:00
parent 0340206a29
commit 727331becc
6 changed files with 201 additions and 45 deletions

View File

@ -432,6 +432,8 @@ Release 2.6.0 - UNRELEASED
HDFS-6634. inotify in HDFS. (James Thomas via wang)
HDFS-4257. The ReplaceDatanodeOnFailure policies could have a forgiving
option (szetszwo via cmccabe)
OPTIMIZATIONS

View File

@ -53,6 +53,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY = "dfs.client.block.write.replace-datanode-on-failure.best-effort";
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_DEFAULT = false;
public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";

View File

@ -1178,7 +1178,17 @@ public class DFSOutputStream extends FSOutputSummer
// Check if replace-datanode policy is satisfied.
if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication,
nodes, isAppend, isHflushed)) {
addDatanode2ExistingPipeline();
try {
addDatanode2ExistingPipeline();
} catch(IOException ioe) {
if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
throw ioe;
}
DFSClient.LOG.warn("Failed to replace datanode."
+ " Continue with the remaining datanodes since "
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY
+ " is set to true.", ioe);
}
}
// get a new generation stamp and an access token

View File

@ -29,26 +29,90 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public enum ReplaceDatanodeOnFailure {
/** The feature is disabled in the entire site. */
DISABLE,
/** Never add a new datanode. */
NEVER,
/**
* DEFAULT policy:
* Let r be the replication number.
* Let n be the number of existing datanodes.
* Add a new datanode only if r >= 3 and either
* (1) floor(r/2) >= n; or
* (2) r > n and the block is hflushed/appended.
*/
DEFAULT,
/** Always add a new datanode when an existing datanode is removed. */
ALWAYS;
public class ReplaceDatanodeOnFailure {
/** The replacement policies */
public enum Policy {
/** The feature is disabled in the entire site. */
DISABLE(Condition.FALSE),
/** Never add a new datanode. */
NEVER(Condition.FALSE),
/** @see ReplaceDatanodeOnFailure.Condition#DEFAULT */
DEFAULT(Condition.DEFAULT),
/** Always add a new datanode when an existing datanode is removed. */
ALWAYS(Condition.TRUE);
private final Condition condition;
private Policy(Condition condition) {
this.condition = condition;
}
Condition getCondition() {
return condition;
}
}
/** Datanode replacement condition */
private static interface Condition {
/** Return true unconditionally. */
static final Condition TRUE = new Condition() {
@Override
public boolean satisfy(short replication, DatanodeInfo[] existings,
int nExistings, boolean isAppend, boolean isHflushed) {
return true;
}
};
/** Return false unconditionally. */
static final Condition FALSE = new Condition() {
@Override
public boolean satisfy(short replication, DatanodeInfo[] existings,
int nExistings, boolean isAppend, boolean isHflushed) {
return false;
}
};
/**
* DEFAULT condition:
* Let r be the replication number.
* Let n be the number of existing datanodes.
* Add a new datanode only if r >= 3 and either
* (1) floor(r/2) >= n; or
* (2) r > n and the block is hflushed/appended.
*/
static final Condition DEFAULT = new Condition() {
@Override
public boolean satisfy(final short replication,
final DatanodeInfo[] existings, final int n, final boolean isAppend,
final boolean isHflushed) {
if (replication < 3) {
return false;
} else {
if (n <= (replication/2)) {
return true;
} else {
return isAppend || isHflushed;
}
}
}
};
/** Is the condition satisfied? */
public boolean satisfy(short replication, DatanodeInfo[] existings,
int nExistings, boolean isAppend, boolean isHflushed);
}
private final Policy policy;
private final boolean bestEffort;
public ReplaceDatanodeOnFailure(Policy policy, boolean bestEffort) {
this.policy = policy;
this.bestEffort = bestEffort;
}
/** Check if the feature is enabled. */
public void checkEnabled() {
if (this == DISABLE) {
if (policy == Policy.DISABLE) {
throw new UnsupportedOperationException(
"This feature is disabled. Please refer to "
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY
@ -56,7 +120,20 @@ public enum ReplaceDatanodeOnFailure {
}
}
/** Is the policy satisfied? */
/**
* Best effort means that the client will try to replace the failed datanode
* (provided that the policy is satisfied), however, it will continue the
* write operation in case that the datanode replacement also fails.
*
* @return Suppose the datanode replacement fails.
* false: An exception should be thrown so that the write will fail.
* true : The write should be resumed with the remaining datandoes.
*/
public boolean isBestEffort() {
return bestEffort;
}
/** Does it need a replacement according to the policy? */
public boolean satisfy(
final short replication, final DatanodeInfo[] existings,
final boolean isAppend, final boolean isHflushed) {
@ -64,40 +141,42 @@ public enum ReplaceDatanodeOnFailure {
if (n == 0 || n >= replication) {
//don't need to add datanode for any policy.
return false;
} else if (this == DISABLE || this == NEVER) {
return false;
} else if (this == ALWAYS) {
return true;
} else {
//DEFAULT
if (replication < 3) {
return false;
} else {
if (n <= (replication/2)) {
return true;
} else {
return isAppend || isHflushed;
}
}
return policy.getCondition().satisfy(
replication, existings, n, isAppend, isHflushed);
}
}
@Override
public String toString() {
return policy.toString();
}
/** Get the setting from configuration. */
public static ReplaceDatanodeOnFailure get(final Configuration conf) {
final Policy policy = getPolicy(conf);
final boolean bestEffort = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_DEFAULT);
return new ReplaceDatanodeOnFailure(policy, bestEffort);
}
private static Policy getPolicy(final Configuration conf) {
final boolean enabled = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT);
if (!enabled) {
return DISABLE;
return Policy.DISABLE;
}
final String policy = conf.get(
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT);
for(int i = 1; i < values().length; i++) {
final ReplaceDatanodeOnFailure rdof = values()[i];
if (rdof.name().equalsIgnoreCase(policy)) {
return rdof;
for(int i = 1; i < Policy.values().length; i++) {
final Policy p = Policy.values()[i];
if (p.name().equalsIgnoreCase(policy)) {
return p;
}
}
throw new HadoopIllegalArgumentException("Illegal configuration value for "
@ -106,12 +185,16 @@ public enum ReplaceDatanodeOnFailure {
}
/** Write the setting to configuration. */
public void write(final Configuration conf) {
public static void write(final Policy policy,
final boolean bestEffort, final Configuration conf) {
conf.setBoolean(
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
this != DISABLE);
policy != Policy.DISABLE);
conf.set(
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
name());
policy.name());
conf.setBoolean(
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY,
bestEffort);
}
}

View File

@ -523,6 +523,28 @@
</description>
</property>
<property>
<name>dfs.client.block.write.replace-datanode-on-failure.best-effort</name>
<value>false</value>
<description>
This property is used only if the value of
dfs.client.block.write.replace-datanode-on-failure.enable is true.
Best effort means that the client will try to replace a failed datanode
in write pipeline (provided that the policy is satisfied), however, it
continues the write operation in case that the datanode replacement also
fails.
Suppose the datanode replacement fails.
false: An exception should be thrown so that the write will fail.
true : The write should be resumed with the remaining datandoes.
Note that setting this property to true allows writing to a pipeline
with a smaller number of datanodes. As a result, it increases the
probability of data loss.
</description>
</property>
<property>
<name>dfs.blockreport.intervalMsec</name>
<value>21600000</value>

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure.Policy;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
@ -54,7 +55,8 @@ public class TestReplaceDatanodeOnFailure {
/** Test DEFAULT ReplaceDatanodeOnFailure policy. */
@Test
public void testDefaultPolicy() throws Exception {
final ReplaceDatanodeOnFailure p = ReplaceDatanodeOnFailure.DEFAULT;
final Configuration conf = new HdfsConfiguration();
final ReplaceDatanodeOnFailure p = ReplaceDatanodeOnFailure.get(conf);
final DatanodeInfo[] infos = new DatanodeInfo[5];
final DatanodeInfo[][] datanodes = new DatanodeInfo[infos.length + 1][];
@ -113,7 +115,7 @@ public class TestReplaceDatanodeOnFailure {
final Configuration conf = new HdfsConfiguration();
//always replace a datanode
ReplaceDatanodeOnFailure.ALWAYS.write(conf);
ReplaceDatanodeOnFailure.write(Policy.ALWAYS, true, conf);
final String[] racks = new String[REPLICATION];
Arrays.fill(racks, RACK0);
@ -239,8 +241,6 @@ public class TestReplaceDatanodeOnFailure {
final Configuration conf = new HdfsConfiguration();
final short REPLICATION = (short)3;
Assert.assertEquals(ReplaceDatanodeOnFailure.DEFAULT, ReplaceDatanodeOnFailure.get(conf));
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
).numDataNodes(1).build();
@ -285,4 +285,41 @@ public class TestReplaceDatanodeOnFailure {
if (cluster != null) {cluster.shutdown();}
}
}
@Test
public void testBestEffort() throws Exception {
final Configuration conf = new HdfsConfiguration();
//always replace a datanode but do not throw exception
ReplaceDatanodeOnFailure.write(Policy.ALWAYS, true, conf);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
).numDataNodes(1).build();
try {
final DistributedFileSystem fs = cluster.getFileSystem();
final Path f = new Path(DIR, "testIgnoreReplaceFailure");
final byte[] bytes = new byte[1000];
{
LOG.info("write " + bytes.length + " bytes to " + f);
final FSDataOutputStream out = fs.create(f, REPLICATION);
out.write(bytes);
out.close();
final FileStatus status = fs.getFileStatus(f);
Assert.assertEquals(REPLICATION, status.getReplication());
Assert.assertEquals(bytes.length, status.getLen());
}
{
LOG.info("append another " + bytes.length + " bytes to " + f);
final FSDataOutputStream out = fs.append(f);
out.write(bytes);
out.close();
}
} finally {
if (cluster != null) {cluster.shutdown();}
}
}
}