HDFS-11674. reserveSpaceForReplicas is not released if append request failed due to mirror down and replica recovered (Contributed by Vinayakumar B)
This commit is contained in:
parent
28b947603b
commit
b0b6de38c9
|
@ -278,6 +278,9 @@ Release 2.7.4 - UNRELEASED
|
||||||
HDFS-11795. Fix ASF License warnings in branch-2.7.
|
HDFS-11795. Fix ASF License warnings in branch-2.7.
|
||||||
(Yiqun Lin via aajisaka)
|
(Yiqun Lin via aajisaka)
|
||||||
|
|
||||||
|
HDFS-11674. reserveSpaceForReplicas is not released if append request failed
|
||||||
|
due to mirror down and replica recovered (vinayakumarb)
|
||||||
|
|
||||||
Release 2.7.3 - 2016-08-25
|
Release 2.7.3 - 2016-08-25
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -426,6 +426,21 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set pipeline in construction
|
||||||
|
*
|
||||||
|
* @param lastBlock the last block of a file
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void setPipelineInConstruction(LocatedBlock lastBlock) throws IOException{
|
||||||
|
// setup pipeline to append to the last block XXX retries??
|
||||||
|
setPipeline(lastBlock);
|
||||||
|
if (nodes.length < 1) {
|
||||||
|
throw new IOException("Unable to retrieve blocks locations " +
|
||||||
|
" for last block " + block + " of file " + src);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void setPipeline(LocatedBlock lb) {
|
private void setPipeline(LocatedBlock lb) {
|
||||||
setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
|
setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
|
||||||
}
|
}
|
||||||
|
@ -2497,6 +2512,13 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
return fileId;
|
return fileId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the data streamer object.
|
||||||
|
*/
|
||||||
|
protected DataStreamer getStreamer() {
|
||||||
|
return streamer;
|
||||||
|
}
|
||||||
|
|
||||||
private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) {
|
private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) {
|
||||||
System.arraycopy(srcs, 0, dsts, 0, skipIndex);
|
System.arraycopy(srcs, 0, dsts, 0, skipIndex);
|
||||||
System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex);
|
System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex);
|
||||||
|
|
|
@ -2359,6 +2359,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
LOG.info("initReplicaRecovery: changing replica state for "
|
LOG.info("initReplicaRecovery: changing replica state for "
|
||||||
+ block + " from " + replica.getState()
|
+ block + " from " + replica.getState()
|
||||||
+ " to " + rur.getState());
|
+ " to " + rur.getState());
|
||||||
|
if (replica.getState() == ReplicaState.TEMPORARY || replica
|
||||||
|
.getState() == ReplicaState.RBW) {
|
||||||
|
((ReplicaInPipeline) replica).releaseAllBytesReserved();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return rur.createInfo();
|
return rur.createInfo();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1355,6 +1355,11 @@ public class DFSTestUtil {
|
||||||
out.abort();
|
out.abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void setPipeline(DFSOutputStream out, LocatedBlock lastBlock)
|
||||||
|
throws IOException {
|
||||||
|
out.getStreamer().setPipelineInConstruction(lastBlock);
|
||||||
|
}
|
||||||
|
|
||||||
public static byte[] asArray(ByteBuffer buf) {
|
public static byte[] asArray(ByteBuffer buf) {
|
||||||
byte arr[] = new byte[buf.remaining()];
|
byte arr[] = new byte[buf.remaining()];
|
||||||
buf.duplicate().get(arr);
|
buf.duplicate().get(arr);
|
||||||
|
|
|
@ -30,10 +30,13 @@ import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.HdfsBlockLocation;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.*;
|
import org.apache.hadoop.hdfs.*;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
|
@ -43,6 +46,7 @@ import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.ExpectedException;
|
import org.junit.rules.ExpectedException;
|
||||||
|
@ -76,9 +80,12 @@ public class TestRbwSpaceReservation {
|
||||||
private DataNodeFaultInjector old = null;
|
private DataNodeFaultInjector old = null;
|
||||||
private static Random rand = new Random();
|
private static Random rand = new Random();
|
||||||
|
|
||||||
private void initConfig(int blockSize) {
|
@Before
|
||||||
|
public void before() {
|
||||||
conf = new HdfsConfiguration();
|
conf = new HdfsConfiguration();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initConfig(int blockSize) {
|
||||||
// Refresh disk usage information frequently.
|
// Refresh disk usage information frequently.
|
||||||
conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC);
|
conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC);
|
||||||
conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
|
conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
|
||||||
|
@ -523,4 +530,62 @@ public class TestRbwSpaceReservation {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testReservedSpaceForLeaseRecovery() throws Exception {
|
||||||
|
final short replication = 3;
|
||||||
|
conf.setInt(
|
||||||
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2);
|
||||||
|
conf.setInt(
|
||||||
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY,
|
||||||
|
1000);
|
||||||
|
startCluster(BLOCK_SIZE, replication, -1);
|
||||||
|
|
||||||
|
final String methodName = GenericTestUtils.getMethodName();
|
||||||
|
final Path file = new Path("/" + methodName + ".01.dat");
|
||||||
|
// Write to the file and kill the writer.
|
||||||
|
FSDataOutputStream os = fs.create(file, replication);
|
||||||
|
os.write(new byte[8192]);
|
||||||
|
os.hflush();
|
||||||
|
os.close();
|
||||||
|
/*
|
||||||
|
* Reset the pipeline for the append in such a way that, datanode which is
|
||||||
|
* down is one of the mirror, not the first datanode.
|
||||||
|
*/
|
||||||
|
HdfsBlockLocation blockLocation = (HdfsBlockLocation) fs.getClient()
|
||||||
|
.getBlockLocations(file.toString(), 0, BLOCK_SIZE)[0];
|
||||||
|
LocatedBlock lastBlock = blockLocation.getLocatedBlock();
|
||||||
|
// stop 3rd node.
|
||||||
|
cluster.stopDataNode(lastBlock.getLocations()[2].getName());
|
||||||
|
try {
|
||||||
|
os = fs.append(file);
|
||||||
|
DFSTestUtil.setPipeline((DFSOutputStream) os.getWrappedStream(),
|
||||||
|
lastBlock);
|
||||||
|
os.writeBytes("hi");
|
||||||
|
os.hsync();
|
||||||
|
} catch (IOException e) {
|
||||||
|
// Append will fail due to not able to replace datanodes in 3 nodes
|
||||||
|
// cluster.
|
||||||
|
LOG.info("", e);
|
||||||
|
}
|
||||||
|
DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream());
|
||||||
|
/*
|
||||||
|
* There is a chance that stopped DN could be chosen as primary for
|
||||||
|
* recovery. If so, then recovery will not happen in time. So mark stopped
|
||||||
|
* node as dead to exclude that node.
|
||||||
|
*/
|
||||||
|
cluster.setDataNodeDead(lastBlock.getLocations()[2]);
|
||||||
|
fs.recoverLease(file);
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
try {
|
||||||
|
return fs.isFileClosed(file);
|
||||||
|
} catch (IOException e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, 500, 30000);
|
||||||
|
checkReservedSpace(0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue