HDFS-8072. Reserved RBW space is not released if client terminates while writing block. (Arpit Agarwal)

This commit is contained in:
Arpit Agarwal 2015-04-08 11:38:21 -07:00
parent 5f59e621ba
commit f0324738c9
7 changed files with 81 additions and 9 deletions

View File

@ -1102,6 +1102,9 @@ Release 2.7.0 - UNRELEASED
HDFS-8038. PBImageDelimitedTextWriter#getEntry output HDFS path in HDFS-8038. PBImageDelimitedTextWriter#getEntry output HDFS path in
platform-specific format. (Xiaoyu Yao via cnauroth) platform-specific format. (Xiaoyu Yao via cnauroth)
HDFS-8072. Reserved RBW space is not released if client terminates while
writing block. (Arpit Agarwal)
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

View File

@ -817,6 +817,7 @@ class BlockReceiver implements Closeable {
} }
} catch (IOException ioe) { } catch (IOException ioe) {
replicaInfo.releaseAllBytesReserved();
if (datanode.isRestarting()) { if (datanode.isRestarting()) {
// Do not throw if shutting down for restart. Otherwise, it will cause // Do not throw if shutting down for restart. Otherwise, it will cause
// premature termination of responder. // premature termination of responder.

View File

@ -148,6 +148,12 @@ public class ReplicaInPipeline extends ReplicaInfo
return bytesReserved; return bytesReserved;
} }
@Override
public void releaseAllBytesReserved() { // ReplicaInPipelineInterface
getVolume().releaseReservedSpace(bytesReserved);
bytesReserved = 0;
}
@Override // ReplicaInPipelineInterface @Override // ReplicaInPipelineInterface
public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) { public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {
this.bytesOnDisk = dataLength; this.bytesOnDisk = dataLength;

View File

@ -44,6 +44,11 @@ public interface ReplicaInPipelineInterface extends Replica {
*/ */
void setBytesAcked(long bytesAcked); void setBytesAcked(long bytesAcked);
/**
* Release any disk space reserved for this replica.
*/
public void releaseAllBytesReserved();
/** /**
* store the checksum for the last chunk along with the data length * store the checksum for the last chunk along with the data length
* @param dataLength number of bytes on disk * @param dataLength number of bytes on disk

View File

@ -289,6 +289,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} }
} }
@Override
public void releaseAllBytesReserved() {
}
@Override @Override
synchronized public long getBytesOnDisk() { synchronized public long getBytesOnDisk() {
if (finalized) { if (finalized) {

View File

@ -40,6 +40,10 @@ public class ExternalReplicaInPipeline implements ReplicaInPipelineInterface {
public void setBytesAcked(long bytesAcked) { public void setBytesAcked(long bytesAcked) {
} }
@Override
public void releaseAllBytesReserved() {
}
@Override @Override
public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) { public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Supplier;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -45,6 +46,7 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeoutException;
/** /**
* Ensure that the DN reserves disk space equivalent to a full block for * Ensure that the DN reserves disk space equivalent to a full block for
@ -53,7 +55,6 @@ import java.util.Random;
public class TestRbwSpaceReservation { public class TestRbwSpaceReservation {
static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class); static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class);
private static final short REPL_FACTOR = 1;
private static final int DU_REFRESH_INTERVAL_MSEC = 500; private static final int DU_REFRESH_INTERVAL_MSEC = 500;
private static final int STORAGES_PER_DATANODE = 1; private static final int STORAGES_PER_DATANODE = 1;
private static final int BLOCK_SIZE = 1024 * 1024; private static final int BLOCK_SIZE = 1024 * 1024;
@ -83,25 +84,38 @@ public class TestRbwSpaceReservation {
((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
} }
private void startCluster(int blockSize, long perVolumeCapacity) throws IOException { /**
*
* @param blockSize
* @param perVolumeCapacity limit the capacity of each volume to the given
* value. If negative, then don't limit.
* @throws IOException
*/
private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacity) throws IOException {
initConfig(blockSize); initConfig(blockSize);
cluster = new MiniDFSCluster cluster = new MiniDFSCluster
.Builder(conf) .Builder(conf)
.storagesPerDatanode(STORAGES_PER_DATANODE) .storagesPerDatanode(STORAGES_PER_DATANODE)
.numDataNodes(REPL_FACTOR) .numDataNodes(numDatanodes)
.build(); .build();
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
client = fs.getClient(); client = fs.getClient();
cluster.waitActive(); cluster.waitActive();
if (perVolumeCapacity >= 0) { if (perVolumeCapacity >= 0) {
for (DataNode dn : cluster.getDataNodes()) {
for (FsVolumeSpi volume : dn.getFSDataset().getVolumes()) {
((FsVolumeImpl) volume).setCapacityForTesting(perVolumeCapacity);
}
}
}
if (numDatanodes == 1) {
List<? extends FsVolumeSpi> volumes = List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes(); cluster.getDataNodes().get(0).getFSDataset().getVolumes();
assertThat(volumes.size(), is(1)); assertThat(volumes.size(), is(1));
singletonVolume = ((FsVolumeImpl) volumes.get(0)); singletonVolume = ((FsVolumeImpl) volumes.get(0));
singletonVolume.setCapacityForTesting(perVolumeCapacity);
} }
} }
@ -128,7 +142,7 @@ public class TestRbwSpaceReservation {
throws IOException, InterruptedException { throws IOException, InterruptedException {
// Enough for 1 block + meta files + some delta. // Enough for 1 block + meta files + some delta.
final long configuredCapacity = fileBlockSize * 2 - 1; final long configuredCapacity = fileBlockSize * 2 - 1;
startCluster(BLOCK_SIZE, configuredCapacity); startCluster(BLOCK_SIZE, 1, configuredCapacity);
FSDataOutputStream out = null; FSDataOutputStream out = null;
Path path = new Path("/" + fileNamePrefix + ".dat"); Path path = new Path("/" + fileNamePrefix + ".dat");
@ -195,7 +209,7 @@ public class TestRbwSpaceReservation {
@Test (timeout=300000) @Test (timeout=300000)
public void testWithLimitedSpace() throws IOException { public void testWithLimitedSpace() throws IOException {
// Cluster with just enough space for a full block + meta. // Cluster with just enough space for a full block + meta.
startCluster(BLOCK_SIZE, 2 * BLOCK_SIZE - 1); startCluster(BLOCK_SIZE, 1, 2 * BLOCK_SIZE - 1);
final String methodName = GenericTestUtils.getMethodName(); final String methodName = GenericTestUtils.getMethodName();
Path file1 = new Path("/" + methodName + ".01.dat"); Path file1 = new Path("/" + methodName + ".01.dat");
Path file2 = new Path("/" + methodName + ".02.dat"); Path file2 = new Path("/" + methodName + ".02.dat");
@ -208,7 +222,6 @@ public class TestRbwSpaceReservation {
os2 = fs.create(file2); os2 = fs.create(file2);
// Write one byte to the first file. // Write one byte to the first file.
LOG.info("arpit: writing first file");
byte[] data = new byte[1]; byte[] data = new byte[1];
os1.write(data); os1.write(data);
os1.hsync(); os1.hsync();
@ -227,6 +240,42 @@ public class TestRbwSpaceReservation {
} }
} }
/**
* Ensure that reserved space is released when the client goes away
* unexpectedly.
*
* The verification is done for each replica in the write pipeline.
*
* @throws IOException
*/
@Test(timeout=300000)
public void testSpaceReleasedOnUnexpectedEof()
throws IOException, InterruptedException, TimeoutException {
final short replication = 3;
startCluster(BLOCK_SIZE, replication, -1);
final String methodName = GenericTestUtils.getMethodName();
final Path file = new Path("/" + methodName + ".01.dat");
// Write 1 byte to the file and kill the writer.
FSDataOutputStream os = fs.create(file, replication);
os.write(new byte[1]);
os.hsync();
DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream());
// Ensure all space reserved for the replica was released on each
// DataNode.
for (DataNode dn : cluster.getDataNodes()) {
final FsVolumeImpl volume = (FsVolumeImpl) dn.getFSDataset().getVolumes().get(0);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return (volume.getReservedForRbw() == 0);
}
}, 500, Integer.MAX_VALUE); // Wait until the test times out.
}
}
/** /**
* Stress test to ensure we are not leaking reserved space. * Stress test to ensure we are not leaking reserved space.
* @throws IOException * @throws IOException
@ -235,7 +284,7 @@ public class TestRbwSpaceReservation {
@Test (timeout=600000) @Test (timeout=600000)
public void stressTest() throws IOException, InterruptedException { public void stressTest() throws IOException, InterruptedException {
final int numWriters = 5; final int numWriters = 5;
startCluster(SMALL_BLOCK_SIZE, SMALL_BLOCK_SIZE * numWriters * 10); startCluster(SMALL_BLOCK_SIZE, 1, SMALL_BLOCK_SIZE * numWriters * 10);
Writer[] writers = new Writer[numWriters]; Writer[] writers = new Writer[numWriters];
// Start a few writers and let them run for a while. // Start a few writers and let them run for a while.