HDFS-4257. The ReplaceDatanodeOnFailure policies could have a forgiving option. Contributed by szetszwo.
(cherry picked from commit 727331becc
)
This commit is contained in:
parent
9daf035fbc
commit
4c17d4d975
|
@ -173,6 +173,8 @@ Release 2.6.0 - UNRELEASED
|
|||
|
||||
HDFS-6634. inotify in HDFS. (James Thomas via wang)
|
||||
|
||||
HDFS-4257. The ReplaceDatanodeOnFailure policies could have a forgiving
|
||||
option (szetszwo via cmccabe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
|
|
|
@ -53,6 +53,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
|
||||
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";
|
||||
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
|
||||
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY = "dfs.client.block.write.replace-datanode-on-failure.best-effort";
|
||||
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_DEFAULT = false;
|
||||
public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
|
||||
public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
|
||||
public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
|
||||
|
|
|
@ -1178,7 +1178,17 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// Check if replace-datanode policy is satisfied.
|
||||
if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication,
|
||||
nodes, isAppend, isHflushed)) {
|
||||
addDatanode2ExistingPipeline();
|
||||
try {
|
||||
addDatanode2ExistingPipeline();
|
||||
} catch(IOException ioe) {
|
||||
if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
|
||||
throw ioe;
|
||||
}
|
||||
DFSClient.LOG.warn("Failed to replace datanode."
|
||||
+ " Continue with the remaining datanodes since "
|
||||
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY
|
||||
+ " is set to true.", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
// get a new generation stamp and an access token
|
||||
|
|
|
@ -29,26 +29,90 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public enum ReplaceDatanodeOnFailure {
|
||||
/** The feature is disabled in the entire site. */
|
||||
DISABLE,
|
||||
/** Never add a new datanode. */
|
||||
NEVER,
|
||||
/**
|
||||
* DEFAULT policy:
|
||||
* Let r be the replication number.
|
||||
* Let n be the number of existing datanodes.
|
||||
* Add a new datanode only if r >= 3 and either
|
||||
* (1) floor(r/2) >= n; or
|
||||
* (2) r > n and the block is hflushed/appended.
|
||||
*/
|
||||
DEFAULT,
|
||||
/** Always add a new datanode when an existing datanode is removed. */
|
||||
ALWAYS;
|
||||
public class ReplaceDatanodeOnFailure {
|
||||
/** The replacement policies */
|
||||
public enum Policy {
|
||||
/** The feature is disabled in the entire site. */
|
||||
DISABLE(Condition.FALSE),
|
||||
/** Never add a new datanode. */
|
||||
NEVER(Condition.FALSE),
|
||||
/** @see ReplaceDatanodeOnFailure.Condition#DEFAULT */
|
||||
DEFAULT(Condition.DEFAULT),
|
||||
/** Always add a new datanode when an existing datanode is removed. */
|
||||
ALWAYS(Condition.TRUE);
|
||||
|
||||
private final Condition condition;
|
||||
|
||||
private Policy(Condition condition) {
|
||||
this.condition = condition;
|
||||
}
|
||||
|
||||
Condition getCondition() {
|
||||
return condition;
|
||||
}
|
||||
}
|
||||
|
||||
/** Datanode replacement condition */
|
||||
private static interface Condition {
|
||||
/** Return true unconditionally. */
|
||||
static final Condition TRUE = new Condition() {
|
||||
@Override
|
||||
public boolean satisfy(short replication, DatanodeInfo[] existings,
|
||||
int nExistings, boolean isAppend, boolean isHflushed) {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
/** Return false unconditionally. */
|
||||
static final Condition FALSE = new Condition() {
|
||||
@Override
|
||||
public boolean satisfy(short replication, DatanodeInfo[] existings,
|
||||
int nExistings, boolean isAppend, boolean isHflushed) {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* DEFAULT condition:
|
||||
* Let r be the replication number.
|
||||
* Let n be the number of existing datanodes.
|
||||
* Add a new datanode only if r >= 3 and either
|
||||
* (1) floor(r/2) >= n; or
|
||||
* (2) r > n and the block is hflushed/appended.
|
||||
*/
|
||||
static final Condition DEFAULT = new Condition() {
|
||||
@Override
|
||||
public boolean satisfy(final short replication,
|
||||
final DatanodeInfo[] existings, final int n, final boolean isAppend,
|
||||
final boolean isHflushed) {
|
||||
if (replication < 3) {
|
||||
return false;
|
||||
} else {
|
||||
if (n <= (replication/2)) {
|
||||
return true;
|
||||
} else {
|
||||
return isAppend || isHflushed;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/** Is the condition satisfied? */
|
||||
public boolean satisfy(short replication, DatanodeInfo[] existings,
|
||||
int nExistings, boolean isAppend, boolean isHflushed);
|
||||
}
|
||||
|
||||
private final Policy policy;
|
||||
private final boolean bestEffort;
|
||||
|
||||
public ReplaceDatanodeOnFailure(Policy policy, boolean bestEffort) {
|
||||
this.policy = policy;
|
||||
this.bestEffort = bestEffort;
|
||||
}
|
||||
|
||||
/** Check if the feature is enabled. */
|
||||
public void checkEnabled() {
|
||||
if (this == DISABLE) {
|
||||
if (policy == Policy.DISABLE) {
|
||||
throw new UnsupportedOperationException(
|
||||
"This feature is disabled. Please refer to "
|
||||
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY
|
||||
|
@ -56,7 +120,20 @@ public enum ReplaceDatanodeOnFailure {
|
|||
}
|
||||
}
|
||||
|
||||
/** Is the policy satisfied? */
|
||||
/**
|
||||
* Best effort means that the client will try to replace the failed datanode
|
||||
* (provided that the policy is satisfied), however, it will continue the
|
||||
* write operation in case that the datanode replacement also fails.
|
||||
*
|
||||
* @return Suppose the datanode replacement fails.
|
||||
* false: An exception should be thrown so that the write will fail.
|
||||
* true : The write should be resumed with the remaining datandoes.
|
||||
*/
|
||||
public boolean isBestEffort() {
|
||||
return bestEffort;
|
||||
}
|
||||
|
||||
/** Does it need a replacement according to the policy? */
|
||||
public boolean satisfy(
|
||||
final short replication, final DatanodeInfo[] existings,
|
||||
final boolean isAppend, final boolean isHflushed) {
|
||||
|
@ -64,40 +141,42 @@ public enum ReplaceDatanodeOnFailure {
|
|||
if (n == 0 || n >= replication) {
|
||||
//don't need to add datanode for any policy.
|
||||
return false;
|
||||
} else if (this == DISABLE || this == NEVER) {
|
||||
return false;
|
||||
} else if (this == ALWAYS) {
|
||||
return true;
|
||||
} else {
|
||||
//DEFAULT
|
||||
if (replication < 3) {
|
||||
return false;
|
||||
} else {
|
||||
if (n <= (replication/2)) {
|
||||
return true;
|
||||
} else {
|
||||
return isAppend || isHflushed;
|
||||
}
|
||||
}
|
||||
return policy.getCondition().satisfy(
|
||||
replication, existings, n, isAppend, isHflushed);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return policy.toString();
|
||||
}
|
||||
|
||||
/** Get the setting from configuration. */
|
||||
public static ReplaceDatanodeOnFailure get(final Configuration conf) {
|
||||
final Policy policy = getPolicy(conf);
|
||||
final boolean bestEffort = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY,
|
||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_DEFAULT);
|
||||
|
||||
return new ReplaceDatanodeOnFailure(policy, bestEffort);
|
||||
}
|
||||
|
||||
private static Policy getPolicy(final Configuration conf) {
|
||||
final boolean enabled = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
|
||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT);
|
||||
if (!enabled) {
|
||||
return DISABLE;
|
||||
return Policy.DISABLE;
|
||||
}
|
||||
|
||||
final String policy = conf.get(
|
||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
|
||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT);
|
||||
for(int i = 1; i < values().length; i++) {
|
||||
final ReplaceDatanodeOnFailure rdof = values()[i];
|
||||
if (rdof.name().equalsIgnoreCase(policy)) {
|
||||
return rdof;
|
||||
for(int i = 1; i < Policy.values().length; i++) {
|
||||
final Policy p = Policy.values()[i];
|
||||
if (p.name().equalsIgnoreCase(policy)) {
|
||||
return p;
|
||||
}
|
||||
}
|
||||
throw new HadoopIllegalArgumentException("Illegal configuration value for "
|
||||
|
@ -106,12 +185,16 @@ public enum ReplaceDatanodeOnFailure {
|
|||
}
|
||||
|
||||
/** Write the setting to configuration. */
|
||||
public void write(final Configuration conf) {
|
||||
public static void write(final Policy policy,
|
||||
final boolean bestEffort, final Configuration conf) {
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
|
||||
this != DISABLE);
|
||||
policy != Policy.DISABLE);
|
||||
conf.set(
|
||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
|
||||
name());
|
||||
policy.name());
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY,
|
||||
bestEffort);
|
||||
}
|
||||
}
|
|
@ -523,6 +523,28 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.client.block.write.replace-datanode-on-failure.best-effort</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
This property is used only if the value of
|
||||
dfs.client.block.write.replace-datanode-on-failure.enable is true.
|
||||
|
||||
Best effort means that the client will try to replace a failed datanode
|
||||
in write pipeline (provided that the policy is satisfied), however, it
|
||||
continues the write operation in case that the datanode replacement also
|
||||
fails.
|
||||
|
||||
Suppose the datanode replacement fails.
|
||||
false: An exception should be thrown so that the write will fail.
|
||||
true : The write should be resumed with the remaining datandoes.
|
||||
|
||||
Note that setting this property to true allows writing to a pipeline
|
||||
with a smaller number of datanodes. As a result, it increases the
|
||||
probability of data loss.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.blockreport.intervalMsec</name>
|
||||
<value>21600000</value>
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure.Policy;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.Assert;
|
||||
|
@ -55,7 +56,8 @@ public class TestReplaceDatanodeOnFailure {
|
|||
/** Test DEFAULT ReplaceDatanodeOnFailure policy. */
|
||||
@Test
|
||||
public void testDefaultPolicy() throws Exception {
|
||||
final ReplaceDatanodeOnFailure p = ReplaceDatanodeOnFailure.DEFAULT;
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
final ReplaceDatanodeOnFailure p = ReplaceDatanodeOnFailure.get(conf);
|
||||
|
||||
final DatanodeInfo[] infos = new DatanodeInfo[5];
|
||||
final DatanodeInfo[][] datanodes = new DatanodeInfo[infos.length + 1][];
|
||||
|
@ -114,7 +116,7 @@ public class TestReplaceDatanodeOnFailure {
|
|||
final Configuration conf = new HdfsConfiguration();
|
||||
|
||||
//always replace a datanode
|
||||
ReplaceDatanodeOnFailure.ALWAYS.write(conf);
|
||||
ReplaceDatanodeOnFailure.write(Policy.ALWAYS, true, conf);
|
||||
|
||||
final String[] racks = new String[REPLICATION];
|
||||
Arrays.fill(racks, RACK0);
|
||||
|
@ -240,8 +242,6 @@ public class TestReplaceDatanodeOnFailure {
|
|||
final Configuration conf = new HdfsConfiguration();
|
||||
final short REPLICATION = (short)3;
|
||||
|
||||
Assert.assertEquals(ReplaceDatanodeOnFailure.DEFAULT, ReplaceDatanodeOnFailure.get(conf));
|
||||
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
|
||||
).numDataNodes(1).build();
|
||||
|
||||
|
@ -286,4 +286,41 @@ public class TestReplaceDatanodeOnFailure {
|
|||
if (cluster != null) {cluster.shutdown();}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBestEffort() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
|
||||
//always replace a datanode but do not throw exception
|
||||
ReplaceDatanodeOnFailure.write(Policy.ALWAYS, true, conf);
|
||||
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
|
||||
).numDataNodes(1).build();
|
||||
|
||||
try {
|
||||
final DistributedFileSystem fs = cluster.getFileSystem();
|
||||
final Path f = new Path(DIR, "testIgnoreReplaceFailure");
|
||||
|
||||
final byte[] bytes = new byte[1000];
|
||||
{
|
||||
LOG.info("write " + bytes.length + " bytes to " + f);
|
||||
final FSDataOutputStream out = fs.create(f, REPLICATION);
|
||||
out.write(bytes);
|
||||
out.close();
|
||||
|
||||
final FileStatus status = fs.getFileStatus(f);
|
||||
Assert.assertEquals(REPLICATION, status.getReplication());
|
||||
Assert.assertEquals(bytes.length, status.getLen());
|
||||
}
|
||||
|
||||
{
|
||||
LOG.info("append another " + bytes.length + " bytes to " + f);
|
||||
final FSDataOutputStream out = fs.append(f);
|
||||
out.write(bytes);
|
||||
out.close();
|
||||
}
|
||||
} finally {
|
||||
if (cluster != null) {cluster.shutdown();}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue