HDFS-9530. ReservedSpace is not cleared for abandoned Blocks (Contributed by Brahma Reddy Battula)
This commit is contained in:
parent
4a498f5fa1
commit
6a1fc34595
|
@ -201,6 +201,9 @@ Release 2.7.3 - UNRELEASED
|
||||||
HDFS-10474. hftp copy fails when file name with Chinese+special char
|
HDFS-10474. hftp copy fails when file name with Chinese+special char
|
||||||
in branch-2 (Brahma Reddy Battula)
|
in branch-2 (Brahma Reddy Battula)
|
||||||
|
|
||||||
|
HDFS-9530. ReservedSpace is not cleared for abandoned Blocks
|
||||||
|
(Brahma Reddy Battula)
|
||||||
|
|
||||||
Release 2.7.2 - 2016-01-25
|
Release 2.7.2 - 2016-01-25
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -53,4 +53,6 @@ public class DataNodeFaultInjector {
|
||||||
public void stopSendingPacketDownstream() throws IOException {}
|
public void stopSendingPacketDownstream() throws IOException {}
|
||||||
|
|
||||||
public void noRegistration() throws IOException { }
|
public void noRegistration() throws IOException { }
|
||||||
|
|
||||||
|
public void failMirrorConnection() throws IOException { }
|
||||||
}
|
}
|
||||||
|
|
|
@ -697,6 +697,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
|
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
|
||||||
mirrorSock = datanode.newSocket();
|
mirrorSock = datanode.newSocket();
|
||||||
try {
|
try {
|
||||||
|
DataNodeFaultInjector.get().failMirrorConnection();
|
||||||
int timeoutValue = dnConf.socketTimeout
|
int timeoutValue = dnConf.socketTimeout
|
||||||
+ (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length);
|
+ (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length);
|
||||||
int writeTimeout = dnConf.socketWriteTimeout +
|
int writeTimeout = dnConf.socketWriteTimeout +
|
||||||
|
|
|
@ -81,6 +81,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
|
||||||
|
@ -1848,6 +1849,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
LOG.debug("Block file " + removing.getBlockFile().getName()
|
LOG.debug("Block file " + removing.getBlockFile().getName()
|
||||||
+ " is to be deleted");
|
+ " is to be deleted");
|
||||||
}
|
}
|
||||||
|
if (removing instanceof ReplicaInPipelineInterface) {
|
||||||
|
((ReplicaInPipelineInterface) removing).releaseAllBytesReserved();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (v.isTransientStorage()) {
|
if (v.isTransientStorage()) {
|
||||||
|
|
|
@ -35,6 +35,8 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.*;
|
import org.apache.hadoop.hdfs.*;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
@ -71,7 +73,7 @@ public class TestRbwSpaceReservation {
|
||||||
private DistributedFileSystem fs = null;
|
private DistributedFileSystem fs = null;
|
||||||
private DFSClient client = null;
|
private DFSClient client = null;
|
||||||
FsVolumeImpl singletonVolume = null;
|
FsVolumeImpl singletonVolume = null;
|
||||||
|
private DataNodeFaultInjector old = null;
|
||||||
private static Random rand = new Random();
|
private static Random rand = new Random();
|
||||||
|
|
||||||
private void initConfig(int blockSize) {
|
private void initConfig(int blockSize) {
|
||||||
|
@ -459,6 +461,50 @@ public class TestRbwSpaceReservation {
|
||||||
checkReservedSpace(expectedFile2Reserved);
|
checkReservedSpace(expectedFile2Reserved);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testReservedSpaceForPipelineRecovery()
|
||||||
|
throws Exception {
|
||||||
|
final short replication = 3;
|
||||||
|
startCluster(BLOCK_SIZE, replication, -1);
|
||||||
|
|
||||||
|
final String methodName = GenericTestUtils.getMethodName();
|
||||||
|
final Path file = new Path("/" + methodName + ".01.dat");
|
||||||
|
|
||||||
|
old = DataNodeFaultInjector.get();
|
||||||
|
// Fault injector to fail connection to mirror first time.
|
||||||
|
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
|
||||||
|
private int tries = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void failMirrorConnection() throws IOException {
|
||||||
|
if (tries++ == 0) {
|
||||||
|
throw new IOException("Failing Mirror for space reservation");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// Write 1 byte to the file and kill the writer.
|
||||||
|
FSDataOutputStream os = fs.create(file, replication);
|
||||||
|
os.write(new byte[1]);
|
||||||
|
os.close();
|
||||||
|
// Ensure all space reserved for the replica was released on each
|
||||||
|
// DataNode.
|
||||||
|
cluster.triggerBlockReports();
|
||||||
|
for (final DataNode dn : cluster.getDataNodes()) {
|
||||||
|
for (FsVolumeSpi fsVolume : dn.getFSDataset().getVolumes()) {
|
||||||
|
{
|
||||||
|
final FsVolumeImpl volume = (FsVolumeImpl) fsVolume;
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override public Boolean get() {
|
||||||
|
LOG.info("dn " + dn.getDisplayName() + " space : " + volume
|
||||||
|
.getReservedForRbw());
|
||||||
|
return (volume.getReservedForRbw() == 0);
|
||||||
|
}
|
||||||
|
}, 100, Integer.MAX_VALUE); // Wait until the test times out.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void checkReservedSpace(final long expectedReserved)
|
private void checkReservedSpace(final long expectedReserved)
|
||||||
throws TimeoutException, InterruptedException, IOException {
|
throws TimeoutException, InterruptedException, IOException {
|
||||||
for (final DataNode dn : cluster.getDataNodes()) {
|
for (final DataNode dn : cluster.getDataNodes()) {
|
||||||
|
|
Loading…
Reference in New Issue