HDFS-11674. reserveSpaceForReplicas is not released if append request failed due to mirror down and replica recovered (Contributed by Vinayakumar B)

This commit is contained in:
Vinayakumar B 2017-05-12 07:38:18 +05:30 committed by Xiaoyu Yao
parent ccdcc34490
commit be303c2990
3 changed files with 75 additions and 1 deletions

View File

@ -2455,6 +2455,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.info("initReplicaRecovery: changing replica state for " LOG.info("initReplicaRecovery: changing replica state for "
+ block + " from " + replica.getState() + block + " from " + replica.getState()
+ " to " + rur.getState()); + " to " + rur.getState());
if (replica.getState() == ReplicaState.TEMPORARY || replica
.getState() == ReplicaState.RBW) {
((ReplicaInPipeline) replica).releaseAllBytesReserved();
}
} }
return rur.createInfo(); return rur.createInfo();
} }

View File

@ -1462,6 +1462,11 @@ public class DFSTestUtil {
out.abort(); out.abort();
} }
public static void setPipeline(DFSOutputStream out, LocatedBlock lastBlock)
throws IOException {
out.getStreamer().setPipelineInConstruction(lastBlock);
}
public static byte[] asArray(ByteBuffer buf) { public static byte[] asArray(ByteBuffer buf) {
byte arr[] = new byte[buf.remaining()]; byte arr[] = new byte[buf.remaining()];
buf.duplicate().get(arr); buf.duplicate().get(arr);

View File

@ -31,10 +31,13 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@ -44,6 +47,7 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.After; import org.junit.After;
import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
@ -82,9 +86,12 @@ public class TestSpaceReservation {
private static Random rand = new Random(); private static Random rand = new Random();
private void initConfig(int blockSize) { @Before
public void before() {
conf = new HdfsConfiguration(); conf = new HdfsConfiguration();
}
private void initConfig(int blockSize) {
// Refresh disk usage information frequently. // Refresh disk usage information frequently.
conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC); conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC);
conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize); conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
@ -680,4 +687,62 @@ public class TestSpaceReservation {
} }
} }
} }
@Test(timeout = 60000)
public void testReservedSpaceForLeaseRecovery() throws Exception {
final short replication = 3;
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2);
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY,
1000);
startCluster(BLOCK_SIZE, replication, -1);
final String methodName = GenericTestUtils.getMethodName();
final Path file = new Path("/" + methodName + ".01.dat");
// Write to the file and kill the writer.
FSDataOutputStream os = fs.create(file, replication);
os.write(new byte[8192]);
os.hflush();
os.close();
/*
* Reset the pipeline for the append in such a way that, datanode which is
* down is one of the mirror, not the first datanode.
*/
HdfsBlockLocation blockLocation = (HdfsBlockLocation) fs.getClient()
.getBlockLocations(file.toString(), 0, BLOCK_SIZE)[0];
LocatedBlock lastBlock = blockLocation.getLocatedBlock();
// stop 3rd node.
cluster.stopDataNode(lastBlock.getLocations()[2].getName());
try {
os = fs.append(file);
DFSTestUtil.setPipeline((DFSOutputStream) os.getWrappedStream(),
lastBlock);
os.writeBytes("hi");
os.hsync();
} catch (IOException e) {
// Append will fail due to not able to replace datanodes in 3 nodes
// cluster.
LOG.info("", e);
}
DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream());
/*
* There is a chance that stopped DN could be chosen as primary for
* recovery. If so, then recovery will not happen in time. So mark stopped
* node as dead to exclude that node.
*/
cluster.setDataNodeDead(lastBlock.getLocations()[2]);
fs.recoverLease(file);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
return fs.isFileClosed(file);
} catch (IOException e) {
return false;
}
}
}, 500, 30000);
checkReservedSpace(0);
}
} }