diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 4667b4b788f..8bbe4f37cb0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -118,6 +118,39 @@ void addToDeadNodes(DatanodeInfo dnInfo) { * Grab the open-file info from namenode */ synchronized void openInfo() throws IOException, UnresolvedLinkException { + lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); + int retriesForLastBlockLength = 3; + while (retriesForLastBlockLength > 0) { + // Getting last block length as -1 is a special case. When cluster + // restarts, DNs may not report immediately. At this time partial block + // locations will not be available with NN for getting the length. Lets + // retry for 3 times to get the length. + if (lastBlockBeingWrittenLength == -1) { + DFSClient.LOG.warn("Last block locations not available. " + + "Datanodes might not have reported blocks completely." + + " Will retry for " + retriesForLastBlockLength + " times"); + waitFor(4000); + lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); + } else { + break; + } + retriesForLastBlockLength--; + } + if (retriesForLastBlockLength == 0) { + throw new IOException("Could not obtain the last block locations."); + } + } + + private void waitFor(int waitTime) throws IOException { + try { + Thread.sleep(waitTime); + } catch (InterruptedException e) { + throw new IOException( + "Interrupted while getting the last block length."); + } + } + + private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException { LocatedBlocks newInfo = DFSClient.callGetBlockLocations(dfsClient.namenode, src, 0, prefetchSize); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("newInfo = " + newInfo); @@ -136,10 +169,13 @@ synchronized void openInfo() throws IOException, UnresolvedLinkException { } } locatedBlocks = newInfo; - lastBlockBeingWrittenLength = 0; + long lastBlockBeingWrittenLength = 0; if (!locatedBlocks.isLastBlockComplete()) { final LocatedBlock last = locatedBlocks.getLastLocatedBlock(); if (last != null) { + if (last.getLocations().length == 0) { + return -1; + } final long len = readBlockLength(last); last.getBlock().setNumBytes(len); lastBlockBeingWrittenLength = len; @@ -147,13 +183,12 @@ synchronized void openInfo() throws IOException, UnresolvedLinkException { } currentNode = null; + return lastBlockBeingWrittenLength; } /** Read the block length from one of the datanodes. */ private long readBlockLength(LocatedBlock locatedblock) throws IOException { - if (locatedblock == null || locatedblock.getLocations().length == 0) { - return 0; - } + assert locatedblock != null : "LocatedBlock cannot be null"; int replicaNotFoundCount = locatedblock.getLocations().length; for(DatanodeInfo datanode : locatedblock.getLocations()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java new file mode 100644 index 00000000000..5f9ad3259d4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.junit.Assert; +import org.junit.Test; + +/** Test the fileLength on cluster restarts */ +public class TestFileLengthOnClusterRestart { + /** + * Tests the fileLength when we sync the file and restart the cluster and + * Datanodes not report to Namenode yet. + */ + @Test(timeout = 60000) + public void testFileLengthWithHSyncAndClusterRestartWithOutDNsRegister() + throws Exception { + final Configuration conf = new HdfsConfiguration(); + // create cluster + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(2).build(); + HdfsDataInputStream in = null; + try { + Path path = new Path(MiniDFSCluster.getBaseDirectory(), "test"); + DistributedFileSystem dfs = (DistributedFileSystem) cluster + .getFileSystem(); + FSDataOutputStream out = dfs.create(path); + int fileLength = 1030; + out.write(new byte[fileLength]); + out.hsync(); + cluster.restartNameNode(); + cluster.waitActive(); + in = (HdfsDataInputStream) dfs.open(path, 1024); + // Verify the length when we just restart NN. DNs will register + // immediately. + Assert.assertEquals(fileLength, in.getVisibleLength()); + cluster.shutdownDataNodes(); + cluster.restartNameNode(false); + // This is just for ensuring NN started. + verifyNNIsInSafeMode(dfs); + + try { + in = (HdfsDataInputStream) dfs.open(path); + Assert.fail("Expected IOException"); + } catch (IOException e) { + Assert.assertEquals("Could not obtain the last block locations.", e + .getLocalizedMessage()); + } + } finally { + if (null != in) { + in.close(); + } + cluster.shutdown(); + + } + } + + private void verifyNNIsInSafeMode(DistributedFileSystem dfs) + throws IOException { + while (true) { + try { + if (dfs.isInSafeMode()) { + return; + } else { + throw new IOException("Expected to be in SafeMode"); + } + } catch (IOException e) { + // NN might not started completely Ignore + } + } + } +}