+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure; +import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure.Policy; +import org.apache.hadoop.io.IOUtils; +import org.junit.Assert; +import org.junit.Test; + +/** + * Verify the behaviours of HdfsClientConfigKeys.BlockWrite. + * ReplaceDatanodeOnFailure.MIN_REPLICATION.if live block location datanodes is + * greater than or equal to + * 'dfs.client.block.write.replace-datanode-on-failure.min.replication' + * threshold value, if yes continue writing to the two remaining nodes. + * Otherwise it will throw exception. + *
+ * If this HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + * MIN_REPLICATION is set to 0 or less than zero, an exception will be thrown + * if a replacement could not be found. + */ +public class TestReplaceDatanodeFailureReplication { + static final Log LOG = LogFactory + .getLog(TestReplaceDatanodeFailureReplication.class); + + static final String DIR = + "/" + TestReplaceDatanodeFailureReplication.class.getSimpleName() + "/"; + static final short REPLICATION = 3; + final private static String RACK0 = "/rack0"; + + /** + * Test fail last datanode in the pipeline. + */ + @Test + public void testLastDatanodeFailureInPipeline() throws Exception { + testWriteFileAndVerifyAfterDNStop(2, 1, 10, false); + } + + /** + * Test fail first datanode in the pipeline. + */ + @Test + public void testFirstDatanodeFailureInPipeline() throws Exception { + testWriteFileAndVerifyAfterDNStop(2, 0, 10, false); + } + + /** + * Test fail all the datanodes except first in the pipeline. + */ + @Test + public void testWithOnlyFirstDatanodeIsAlive() throws Exception { + testWriteFileAndVerifyAfterDNStop(1, 1, 1, true); + } + + /** + * Test fail all the datanodes except lastnode in the pipeline. + */ + @Test + public void testWithOnlyLastDatanodeIsAlive() throws Exception { + testWriteFileAndVerifyAfterDNStop(1, 0, 1, true); + } + + /** + * Test when number of live nodes are less than the + * "dfs.client.block.write.replace-datanode-on-failure.min.replication". + */ + @Test + public void testLessNumberOfLiveDatanodesThanWriteReplaceDatanodeOnFailureRF() + throws Exception { + final MiniDFSCluster cluster = setupCluster(2); + + try { + final DistributedFileSystem fs = cluster.getFileSystem(); + final Path dir = new Path(DIR); + + final SlowWriter[] slowwriters = new SlowWriter[1]; + for (int i = 1; i <= slowwriters.length; i++) { + // create slow writers in different speed + slowwriters[i - 1] = new SlowWriter(fs, new Path(dir, "file" + i), + i * 200L); + } + + for (SlowWriter s : slowwriters) { + s.start(); + } + + // Let slow writers write something. + // Some of them are too slow and will be not yet started. + sleepSeconds(1); + + // stop an old datanode + cluster.stopDataNode(0); + cluster.stopDataNode(0); + + // Let the slow writer writes a few more seconds + // Everyone should have written something. + sleepSeconds(20); + + // check replication and interrupt. + for (SlowWriter s : slowwriters) { + try { + s.out.getCurrentBlockReplication(); + Assert.fail( + "Must throw exception as failed to add a new datanode for write " + + "pipeline, minimum failure replication"); + } catch (IOException e) { + // expected + } + s.interruptRunning(); + } + + // close files + for (SlowWriter s : slowwriters) { + s.joinAndClose(); + } + + // Verify the file + verifyFileContent(fs, slowwriters); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private MiniDFSCluster setupCluster(int failRF) throws IOException { + final Configuration conf = new HdfsConfiguration(); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, failRF); + // always replace a datanode + ReplaceDatanodeOnFailure.write(Policy.ALWAYS, false, conf); + + final String[] racks = new String[REPLICATION]; + Arrays.fill(racks, RACK0); + return new MiniDFSCluster.Builder(conf).racks(racks) + .numDataNodes(REPLICATION).build(); + } + + private void testWriteFileAndVerifyAfterDNStop(int failRF, int dnindex, + int slowWrites, boolean failPipeLine) + throws IOException, InterruptedException, TimeoutException { + final MiniDFSCluster cluster = setupCluster(failRF); + try { + final DistributedFileSystem fs = cluster.getFileSystem(); + final Path dir = new Path(DIR); + + final SlowWriter[] slowwriters = new SlowWriter[slowWrites]; + for (int i = 1; i <= slowwriters.length; i++) { + // create slow writers in different speed + slowwriters[i - 1] = new SlowWriter(fs, new Path(dir, "file" + i), + i * 200L); + } + + for (SlowWriter s : slowwriters) { + s.start(); + } + + // Let slow writers write something. + // Some of them are too slow and will be not yet started. + sleepSeconds(3); + + // stop an datanode + cluster.stopDataNode(dnindex); + if (failPipeLine) { + cluster.stopDataNode(dnindex); + } + + // Let the slow writer writes a few more seconds + // Everyone should have written something. + sleepSeconds(5); + cluster.waitFirstBRCompleted(0, 10000); + // check replication and interrupt. + for (SlowWriter s : slowwriters) { + Assert.assertEquals(failRF, s.out.getCurrentBlockReplication()); + s.interruptRunning(); + } + + // close files + for (SlowWriter s : slowwriters) { + s.joinAndClose(); + } + + // Verify the file + verifyFileContent(fs, slowwriters); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private void verifyFileContent(DistributedFileSystem fs, + SlowWriter[] slowwriters) throws IOException { + LOG.info("Verify the file"); + for (int i = 0; i < slowwriters.length; i++) { + LOG.info(slowwriters[i].filepath + ": length=" + fs + .getFileStatus(slowwriters[i].filepath).getLen()); + FSDataInputStream in = null; + try { + in = fs.open(slowwriters[i].filepath); + for (int j = 0, x;; j++) { + x = in.read(); + if ((x) != -1) { + Assert.assertEquals(j, x); + } else { + return; + } + } + } finally { + IOUtils.closeStream(in); + } + } + } + + static void sleepSeconds(final int waittime) throws InterruptedException { + LOG.info("Wait " + waittime + " seconds"); + Thread.sleep(waittime * 1000L); + } + + static class SlowWriter extends Thread { + private final Path filepath; + private final HdfsDataOutputStream out; + private final long sleepms; + private volatile boolean running = true; + + SlowWriter(DistributedFileSystem fs, Path filepath, final long sleepms) + throws IOException { + super(SlowWriter.class.getSimpleName() + ":" + filepath); + this.filepath = filepath; + this.out = (HdfsDataOutputStream) fs.create(filepath, REPLICATION); + this.sleepms = sleepms; + } + + @Override public void run() { + int i = 0; + try { + sleep(sleepms); + for (; running; i++) { + LOG.info(getName() + " writes " + i); + out.write(i); + out.hflush(); + sleep(sleepms); + } + } catch (InterruptedException e) { + LOG.info(getName() + " interrupted:" + e); + } catch (IOException e) { + throw new RuntimeException(getName(), e); + } finally { + LOG.info(getName() + " terminated: i=" + i); + } + } + + void interruptRunning() { + running = false; + interrupt(); + } + + void joinAndClose() throws InterruptedException { + LOG.info(getName() + " join and close"); + join(); + IOUtils.closeStream(out); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java index 233dc5aa513..47db565e37f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java @@ -41,8 +41,8 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase { public void initializeMemberVariables() { xmlFilename = new String("hdfs-default.xml"); configurationClasses = new Class[] { HdfsClientConfigKeys.class, - HdfsClientConfigKeys.StripedRead.class, - DFSConfigKeys.class}; + HdfsClientConfigKeys.StripedRead.class, DFSConfigKeys.class, + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.class }; // Set error modes errorIfMissingConfigProps = true;