Merge HDFS-3222. DFSInputStream#openInfo should not silently get the length as 0 when locations length is zero for last partial block. Contributed by Uma Maheswara Rao G.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1331067 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
da0f9645a0
commit
16d6cbc9ef
|
@ -437,6 +437,9 @@ Release 2.0.0 - UNRELEASED
|
|||
HDFS-3319. Change DFSOutputStream to not to start a thread in constructors.
|
||||
(szetszwo)
|
||||
|
||||
HDFS-3222. DFSInputStream#openInfo should not silently get the length as 0
|
||||
when locations length is zero for last partial block. (umamahesh)
|
||||
|
||||
BREAKDOWN OF HDFS-1623 SUBTASKS
|
||||
|
||||
HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)
|
||||
|
|
|
@ -116,6 +116,39 @@ public class DFSInputStream extends FSInputStream {
|
|||
* Grab the open-file info from namenode
|
||||
*/
|
||||
synchronized void openInfo() throws IOException, UnresolvedLinkException {
|
||||
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
|
||||
int retriesForLastBlockLength = 3;
|
||||
while (retriesForLastBlockLength > 0) {
|
||||
// Getting last block length as -1 is a special case. When cluster
|
||||
// restarts, DNs may not report immediately. At this time partial block
|
||||
// locations will not be available with NN for getting the length. Lets
|
||||
// retry for 3 times to get the length.
|
||||
if (lastBlockBeingWrittenLength == -1) {
|
||||
DFSClient.LOG.warn("Last block locations not available. "
|
||||
+ "Datanodes might not have reported blocks completely."
|
||||
+ " Will retry for " + retriesForLastBlockLength + " times");
|
||||
waitFor(4000);
|
||||
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
retriesForLastBlockLength--;
|
||||
}
|
||||
if (retriesForLastBlockLength == 0) {
|
||||
throw new IOException("Could not obtain the last block locations.");
|
||||
}
|
||||
}
|
||||
|
||||
private void waitFor(int waitTime) throws IOException {
|
||||
try {
|
||||
Thread.sleep(waitTime);
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(
|
||||
"Interrupted while getting the last block length.");
|
||||
}
|
||||
}
|
||||
|
||||
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
|
||||
LocatedBlocks newInfo = DFSClient.callGetBlockLocations(dfsClient.namenode, src, 0, prefetchSize);
|
||||
if (DFSClient.LOG.isDebugEnabled()) {
|
||||
DFSClient.LOG.debug("newInfo = " + newInfo);
|
||||
|
@ -134,10 +167,13 @@ public class DFSInputStream extends FSInputStream {
|
|||
}
|
||||
}
|
||||
locatedBlocks = newInfo;
|
||||
lastBlockBeingWrittenLength = 0;
|
||||
long lastBlockBeingWrittenLength = 0;
|
||||
if (!locatedBlocks.isLastBlockComplete()) {
|
||||
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
|
||||
if (last != null) {
|
||||
if (last.getLocations().length == 0) {
|
||||
return -1;
|
||||
}
|
||||
final long len = readBlockLength(last);
|
||||
last.getBlock().setNumBytes(len);
|
||||
lastBlockBeingWrittenLength = len;
|
||||
|
@ -145,13 +181,12 @@ public class DFSInputStream extends FSInputStream {
|
|||
}
|
||||
|
||||
currentNode = null;
|
||||
return lastBlockBeingWrittenLength;
|
||||
}
|
||||
|
||||
/** Read the block length from one of the datanodes. */
|
||||
private long readBlockLength(LocatedBlock locatedblock) throws IOException {
|
||||
if (locatedblock == null || locatedblock.getLocations().length == 0) {
|
||||
return 0;
|
||||
}
|
||||
assert locatedblock != null : "LocatedBlock cannot be null";
|
||||
int replicaNotFoundCount = locatedblock.getLocations().length;
|
||||
|
||||
for(DatanodeInfo datanode : locatedblock.getLocations()) {
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/** Test the fileLength on cluster restarts */
|
||||
public class TestFileLengthOnClusterRestart {
|
||||
/**
|
||||
* Tests the fileLength when we sync the file and restart the cluster and
|
||||
* Datanodes not report to Namenode yet.
|
||||
*/
|
||||
@Test(timeout = 60000)
|
||||
public void testFileLengthWithHSyncAndClusterRestartWithOutDNsRegister()
|
||||
throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
// create cluster
|
||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512);
|
||||
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(2).build();
|
||||
HdfsDataInputStream in = null;
|
||||
try {
|
||||
Path path = new Path(MiniDFSCluster.getBaseDirectory(), "test");
|
||||
DistributedFileSystem dfs = (DistributedFileSystem) cluster
|
||||
.getFileSystem();
|
||||
FSDataOutputStream out = dfs.create(path);
|
||||
int fileLength = 1030;
|
||||
out.write(new byte[fileLength]);
|
||||
out.hsync();
|
||||
cluster.restartNameNode();
|
||||
cluster.waitActive();
|
||||
in = (HdfsDataInputStream) dfs.open(path, 1024);
|
||||
// Verify the length when we just restart NN. DNs will register
|
||||
// immediately.
|
||||
Assert.assertEquals(fileLength, in.getVisibleLength());
|
||||
cluster.shutdownDataNodes();
|
||||
cluster.restartNameNode(false);
|
||||
// This is just for ensuring NN started.
|
||||
verifyNNIsInSafeMode(dfs);
|
||||
|
||||
try {
|
||||
in = (HdfsDataInputStream) dfs.open(path);
|
||||
Assert.fail("Expected IOException");
|
||||
} catch (IOException e) {
|
||||
Assert.assertEquals("Could not obtain the last block locations.", e
|
||||
.getLocalizedMessage());
|
||||
}
|
||||
} finally {
|
||||
if (null != in) {
|
||||
in.close();
|
||||
}
|
||||
cluster.shutdown();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyNNIsInSafeMode(DistributedFileSystem dfs)
|
||||
throws IOException {
|
||||
while (true) {
|
||||
try {
|
||||
if (dfs.isInSafeMode()) {
|
||||
return;
|
||||
} else {
|
||||
throw new IOException("Expected to be in SafeMode");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// NN might not started completely Ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue