HDFS-9530. ReservedSpace is not cleared for abandoned Blocks (Contributed by Brahma Reddy Battula)
(cherry picked from commitf2ac132d6a
) (cherry picked from commitee0f389ecd
)
This commit is contained in:
parent
33e6986ec9
commit
aa8f4cc48c
|
@ -53,4 +53,6 @@ public class DataNodeFaultInjector {
|
|||
public void stopSendingPacketDownstream() throws IOException {}
|
||||
|
||||
public void noRegistration() throws IOException { }
|
||||
|
||||
public void failMirrorConnection() throws IOException { }
|
||||
}
|
||||
|
|
|
@ -738,6 +738,9 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
|
||||
mirrorSock = datanode.newSocket();
|
||||
try {
|
||||
|
||||
DataNodeFaultInjector.get().failMirrorConnection();
|
||||
|
||||
int timeoutValue = dnConf.socketTimeout +
|
||||
(HdfsConstants.READ_TIMEOUT_EXTENSION * targets.length);
|
||||
int writeTimeout = dnConf.socketWriteTimeout +
|
||||
|
|
|
@ -84,6 +84,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
|
|||
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
|
||||
|
@ -1958,6 +1959,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
LOG.debug("Block file " + removing.getBlockFile().getName()
|
||||
+ " is to be deleted");
|
||||
}
|
||||
if (removing instanceof ReplicaInPipelineInterface) {
|
||||
((ReplicaInPipelineInterface) removing).releaseAllBytesReserved();
|
||||
}
|
||||
}
|
||||
|
||||
if (v.isTransientStorage()) {
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hdfs.*;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
@ -77,6 +78,7 @@ public class TestSpaceReservation {
|
|||
private DFSClient client = null;
|
||||
FsVolumeReference singletonVolumeRef = null;
|
||||
FsVolumeImpl singletonVolume = null;
|
||||
private DataNodeFaultInjector old = null;
|
||||
|
||||
private static Random rand = new Random();
|
||||
|
||||
|
@ -146,6 +148,9 @@ public class TestSpaceReservation {
|
|||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
if (old != null) {
|
||||
DataNodeFaultInjector.set(old);
|
||||
}
|
||||
}
|
||||
|
||||
private void createFileAndTestSpaceReservation(
|
||||
|
@ -613,6 +618,49 @@ public class TestSpaceReservation {
|
|||
checkReservedSpace(expectedFile2Reserved);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testReservedSpaceForPipelineRecovery() throws Exception {
|
||||
final short replication = 3;
|
||||
startCluster(BLOCK_SIZE, replication, -1);
|
||||
|
||||
final String methodName = GenericTestUtils.getMethodName();
|
||||
final Path file = new Path("/" + methodName + ".01.dat");
|
||||
|
||||
old = DataNodeFaultInjector.get();
|
||||
// Fault injector to fail connection to mirror first time.
|
||||
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
|
||||
private int tries = 0;
|
||||
|
||||
@Override
|
||||
public void failMirrorConnection() throws IOException {
|
||||
if (tries++ == 0) {
|
||||
throw new IOException("Failing Mirror for space reservation");
|
||||
}
|
||||
}
|
||||
});
|
||||
// Write 1 byte to the file and kill the writer.
|
||||
FSDataOutputStream os = fs.create(file, replication);
|
||||
os.write(new byte[1]);
|
||||
os.close();
|
||||
// Ensure all space reserved for the replica was released on each
|
||||
// DataNode.
|
||||
cluster.triggerBlockReports();
|
||||
for (final DataNode dn : cluster.getDataNodes()) {
|
||||
try (FsDatasetSpi.FsVolumeReferences volumes =
|
||||
dn.getFSDataset().getFsVolumeReferences()) {
|
||||
final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
LOG.info("dn " + dn.getDisplayName() + " space : "
|
||||
+ volume.getReservedForReplicas());
|
||||
return (volume.getReservedForReplicas() == 0);
|
||||
}
|
||||
}, 100, Integer.MAX_VALUE); // Wait until the test times out.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkReservedSpace(final long expectedReserved) throws TimeoutException,
|
||||
InterruptedException, IOException {
|
||||
for (final DataNode dn : cluster.getDataNodes()) {
|
||||
|
|
Loading…
Reference in New Issue