HDFS-11674. reserveSpaceForReplicas is not released if append request failed due to mirror down and replica recovered (Contributed by Vinayakumar B)

(cherry picked from commit 1411612aa4e70c704b941723217ed4efd8a0125b)
(cherry picked from commit 53d9f56a18214ff351384589ebfadd6afae2c62f)
This commit is contained in:
Vinayakumar B 2017-05-12 07:38:18 +05:30
parent aae0600a67
commit b121e047b4
3 changed files with 75 additions and 1 deletions

View File

@ -2558,6 +2558,10 @@ static ReplicaRecoveryInfo initReplicaRecoveryImpl(String bpid, ReplicaMap map,
LOG.info("initReplicaRecovery: changing replica state for "
+ block + " from " + replica.getState()
+ " to " + rur.getState());
if (replica.getState() == ReplicaState.TEMPORARY || replica
.getState() == ReplicaState.RBW) {
((ReplicaInPipeline) replica).releaseAllBytesReserved();
}
}
return rur.createInfo();
}

View File

@ -1412,6 +1412,11 @@ public static void abortStream(DFSOutputStream out) throws IOException {
out.abort();
}
public static void setPipeline(DFSOutputStream out, LocatedBlock lastBlock)
throws IOException {
out.getStreamer().setPipelineInConstruction(lastBlock);
}
public static byte[] asArray(ByteBuffer buf) {
byte arr[] = new byte[buf.remaining()];
buf.duplicate().get(arr);

View File

@ -31,10 +31,13 @@
import static org.junit.Assert.fail;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@ -44,6 +47,7 @@
import org.apache.hadoop.util.Daemon;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@ -82,9 +86,12 @@ public class TestSpaceReservation {
private static Random rand = new Random();
private void initConfig(int blockSize) {
@Before
public void before() {
conf = new HdfsConfiguration();
}
private void initConfig(int blockSize) {
// Refresh disk usage information frequently.
conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC);
conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
@ -680,4 +687,62 @@ public Boolean get() {
}
}
}
@Test(timeout = 60000)
public void testReservedSpaceForLeaseRecovery() throws Exception {
final short replication = 3;
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2);
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY,
1000);
startCluster(BLOCK_SIZE, replication, -1);
final String methodName = GenericTestUtils.getMethodName();
final Path file = new Path("/" + methodName + ".01.dat");
// Write to the file and kill the writer.
FSDataOutputStream os = fs.create(file, replication);
os.write(new byte[8192]);
os.hflush();
os.close();
/*
* Reset the pipeline for the append in such a way that, datanode which is
* down is one of the mirror, not the first datanode.
*/
HdfsBlockLocation blockLocation = (HdfsBlockLocation) fs.getClient()
.getBlockLocations(file.toString(), 0, BLOCK_SIZE)[0];
LocatedBlock lastBlock = blockLocation.getLocatedBlock();
// stop 3rd node.
cluster.stopDataNode(lastBlock.getLocations()[2].getName());
try {
os = fs.append(file);
DFSTestUtil.setPipeline((DFSOutputStream) os.getWrappedStream(),
lastBlock);
os.writeBytes("hi");
os.hsync();
} catch (IOException e) {
// Append will fail due to not able to replace datanodes in 3 nodes
// cluster.
LOG.info("", e);
}
DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream());
/*
* There is a chance that stopped DN could be chosen as primary for
* recovery. If so, then recovery will not happen in time. So mark stopped
* node as dead to exclude that node.
*/
cluster.setDataNodeDead(lastBlock.getLocations()[2]);
fs.recoverLease(file);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
return fs.isFileClosed(file);
} catch (IOException e) {
return false;
}
}
}, 500, 30000);
checkReservedSpace(0);
}
}