HDFS-6898. DN must reserve space for a full block when an RBW block is created. (Contributed by Arpit Agarwal)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
This commit is contained in:
arp 2014-09-06 20:02:40 -07:00
parent af1d389157
commit eba06e61ce
12 changed files with 421 additions and 20 deletions

View File

@ -351,6 +351,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6862. Add missing timeout annotations to tests. (Xiaoyu Yao via
Arpit Agarwal)
HDFS-6898. DN must reserve space for a full block when an RBW block is
created. (Arpit Agarwal)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -50,7 +50,7 @@ public class HdfsConstants {
"org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol";
public static final int MIN_BLOCKS_FOR_WRITE = 5;
public static final int MIN_BLOCKS_FOR_WRITE = 1;
// Long that indicates "leave current quota unchanged"
public static final long QUOTA_DONT_SET = Long.MAX_VALUE;

View File

@ -34,10 +34,12 @@ public class ReplicaBeingWritten extends ReplicaInPipeline {
* @param genStamp replica generation stamp
* @param vol volume where replica is located
* @param dir directory path where block and meta files are located
* @param bytesToReserve disk space to reserve for this replica, based on
* the estimated maximum block length.
*/
public ReplicaBeingWritten(long blockId, long genStamp,
FsVolumeSpi vol, File dir) {
super( blockId, genStamp, vol, dir);
FsVolumeSpi vol, File dir, long bytesToReserve) {
super(blockId, genStamp, vol, dir, bytesToReserve);
}
/**
@ -60,10 +62,12 @@ public class ReplicaBeingWritten extends ReplicaInPipeline {
* @param vol volume where replica is located
* @param dir directory path where block and meta files are located
* @param writer a thread that is writing to this replica
* @param bytesToReserve disk space to reserve for this replica, based on
* the estimated maximum block length.
*/
public ReplicaBeingWritten(long blockId, long len, long genStamp,
FsVolumeSpi vol, File dir, Thread writer ) {
super( blockId, len, genStamp, vol, dir, writer);
FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) {
super(blockId, len, genStamp, vol, dir, writer, bytesToReserve);
}
/**

View File

@ -44,6 +44,13 @@ public class ReplicaInPipeline extends ReplicaInfo
private long bytesOnDisk;
private byte[] lastChecksum;
private Thread writer;
/**
* Bytes reserved for this replica on the containing volume.
* Based off difference between the estimated maximum block length and
* the bytes already written to this block.
*/
private long bytesReserved;
/**
* Constructor for a zero length replica
@ -51,10 +58,12 @@ public class ReplicaInPipeline extends ReplicaInfo
* @param genStamp replica generation stamp
* @param vol volume where replica is located
* @param dir directory path where block and meta files are located
* @param bytesToReserve disk space to reserve for this replica, based on
* the estimated maximum block length.
*/
public ReplicaInPipeline(long blockId, long genStamp,
FsVolumeSpi vol, File dir) {
this( blockId, 0L, genStamp, vol, dir, Thread.currentThread());
FsVolumeSpi vol, File dir, long bytesToReserve) {
this(blockId, 0L, genStamp, vol, dir, Thread.currentThread(), bytesToReserve);
}
/**
@ -67,7 +76,7 @@ public class ReplicaInPipeline extends ReplicaInfo
ReplicaInPipeline(Block block,
FsVolumeSpi vol, File dir, Thread writer) {
this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(),
vol, dir, writer);
vol, dir, writer, 0L);
}
/**
@ -78,13 +87,16 @@ public class ReplicaInPipeline extends ReplicaInfo
* @param vol volume where replica is located
* @param dir directory path where block and meta files are located
* @param writer a thread that is writing to this replica
* @param bytesToReserve disk space to reserve for this replica, based on
* the estimated maximum block length.
*/
ReplicaInPipeline(long blockId, long len, long genStamp,
FsVolumeSpi vol, File dir, Thread writer ) {
FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) {
super( blockId, len, genStamp, vol, dir);
this.bytesAcked = len;
this.bytesOnDisk = len;
this.writer = writer;
this.bytesReserved = bytesToReserve;
}
/**
@ -96,6 +108,7 @@ public class ReplicaInPipeline extends ReplicaInfo
this.bytesAcked = from.getBytesAcked();
this.bytesOnDisk = from.getBytesOnDisk();
this.writer = from.writer;
this.bytesReserved = from.bytesReserved;
}
@Override
@ -115,13 +128,25 @@ public class ReplicaInPipeline extends ReplicaInfo
@Override // ReplicaInPipelineInterface
public void setBytesAcked(long bytesAcked) {
long newBytesAcked = bytesAcked - this.bytesAcked;
this.bytesAcked = bytesAcked;
// Once bytes are ACK'ed we can release equivalent space from the
// volume's reservedForRbw count. We could have released it as soon
// as the write-to-disk completed but that would be inefficient.
getVolume().releaseReservedSpace(newBytesAcked);
bytesReserved -= newBytesAcked;
}
@Override // ReplicaInPipelineInterface
public long getBytesOnDisk() {
return bytesOnDisk;
}
@Override
public long getBytesReserved() {
return bytesReserved;
}
@Override // ReplicaInPipelineInterface
public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {

View File

@ -240,6 +240,13 @@ abstract public class ReplicaInfo extends Block implements Replica {
public void setUnlinked() {
// no need to be unlinked
}
/**
* Number of bytes reserved for this replica on disk.
*/
public long getBytesReserved() {
return 0;
}
/**
* Copy specified file into a temporary file. Then rename the

View File

@ -45,4 +45,15 @@ public interface FsVolumeSpi {
public File getFinalizedDir(String bpid) throws IOException;
public StorageType getStorageType();
/**
* Reserve disk space for an RBW block so a writer does not run out of
* space before the block is full.
*/
public void reserveSpaceForRbw(long bytesToReserve);
/**
* Release disk space previously reserved for RBW block.
*/
public void releaseReservedSpace(long bytesToRelease);
}

View File

@ -296,9 +296,11 @@ class BlockPoolSlice {
// The restart meta file exists
if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
// It didn't expire. Load the replica as a RBW.
// We don't know the expected block length, so just use 0
// and don't reserve any more space for writes.
newReplica = new ReplicaBeingWritten(blockId,
validateIntegrityAndSetLength(blockFile, genStamp),
genStamp, volume, blockFile.getParentFile(), null);
genStamp, volume, blockFile.getParentFile(), null, 0);
loadRwr = false;
}
sc.close();

View File

@ -593,7 +593,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
+ " from " + srcfile + " to " + dstfile.getAbsolutePath(), e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta
LOG.debug("addFinalizedBlock: Moved " + srcmeta + " to " + dstmeta
+ " and " + srcfile + " to " + dstfile);
}
return dstfile;
@ -712,7 +712,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
File oldmeta = replicaInfo.getMetaFile();
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
v, newBlkFile.getParentFile(), Thread.currentThread());
v, newBlkFile.getParentFile(), Thread.currentThread(), estimateBlockLen);
File newmeta = newReplicaInfo.getMetaFile();
// rename meta file to rbw directory
@ -748,7 +748,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Replace finalized replica by a RBW replica in replicas map
volumeMap.add(bpid, newReplicaInfo);
v.reserveSpaceForRbw(estimateBlockLen - replicaInfo.getNumBytes());
return newReplicaInfo;
}
@ -876,7 +876,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// create a rbw file to hold block in the designated volume
File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile());
b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
return newReplicaInfo;
}
@ -992,7 +992,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// create RBW
final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
blockId, numBytes, expectedGs,
v, dest.getParentFile(), Thread.currentThread());
v, dest.getParentFile(), Thread.currentThread(), 0);
rbw.setBytesAcked(visible);
// overwrite the RBW in the volume map
volumeMap.add(b.getBlockPoolId(), rbw);
@ -1013,7 +1013,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// create a temporary file to hold block in the designated volume
File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile());
b.getGenerationStamp(), v, f.getParentFile(), 0);
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
return newReplicaInfo;
@ -1079,7 +1079,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
" for block " + replicaInfo);
}
File dest = v.addBlock(bpid, replicaInfo, f);
File dest = v.addFinalizedBlock(
bpid, replicaInfo, f, replicaInfo.getBytesReserved());
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
}
volumeMap.add(bpid, newReplicaInfo);

View File

@ -28,6 +28,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
@ -61,6 +62,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
private final DF usage;
private final long reserved;
// Disk space reserved for open blocks.
private AtomicLong reservedForRbw;
// Capacity configured. This is useful when we want to
// limit the visible capacity for tests. If negative, then we just
// query from the filesystem.
@ -81,6 +85,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
this.reserved = conf.getLong(
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
this.reservedForRbw = new AtomicLong(0L);
this.currentDir = currentDir;
File parent = currentDir.getParentFile();
this.usage = new DF(parent, conf);
@ -165,13 +170,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
@Override
public long getAvailable() throws IOException {
long remaining = getCapacity()-getDfsUsed();
long remaining = getCapacity() - getDfsUsed() - reservedForRbw.get();
long available = usage.getAvailable();
if (remaining > available) {
remaining = available;
}
return (remaining > 0) ? remaining : 0;
}
@VisibleForTesting
public long getReservedForRbw() {
return reservedForRbw.get();
}
long getReserved(){
return reserved;
@ -216,15 +226,57 @@ public class FsVolumeImpl implements FsVolumeSpi {
return getBlockPoolSlice(bpid).createTmpFile(b);
}
@Override
public void reserveSpaceForRbw(long bytesToReserve) {
if (bytesToReserve != 0) {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug("Reserving " + bytesToReserve + " on volume " + getBasePath());
}
reservedForRbw.addAndGet(bytesToReserve);
}
}
@Override
public void releaseReservedSpace(long bytesToRelease) {
if (bytesToRelease != 0) {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug("Releasing " + bytesToRelease + " on volume " + getBasePath());
}
long oldReservation, newReservation;
do {
oldReservation = reservedForRbw.get();
newReservation = oldReservation - bytesToRelease;
if (newReservation < 0) {
// Failsafe, this should never occur in practice, but if it does we don't
// want to start advertising more space than we have available.
newReservation = 0;
}
} while (!reservedForRbw.compareAndSet(oldReservation, newReservation));
}
}
/**
* RBW files. They get moved to the finalized block directory when
* the block is finalized.
*/
File createRbwFile(String bpid, Block b) throws IOException {
reserveSpaceForRbw(b.getNumBytes());
return getBlockPoolSlice(bpid).createRbwFile(b);
}
File addBlock(String bpid, Block b, File f) throws IOException {
/**
*
* @param bytesReservedForRbw Space that was reserved during
* block creation. Now that the block is being finalized we
* can free up this space.
* @return
* @throws IOException
*/
File addFinalizedBlock(String bpid, Block b,
File f, long bytesReservedForRbw)
throws IOException {
releaseReservedSpace(bytesReservedForRbw);
return getBlockPoolSlice(bpid).addBlock(b, f);
}

View File

@ -424,6 +424,14 @@ public class TestDirectoryScanner {
public String getStorageID() {
return "";
}
@Override
public void reserveSpaceForRbw(long bytesToReserve) {
}
@Override
public void releaseReservedSpace(long bytesToRelease) {
}
}
private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();

View File

@ -0,0 +1,288 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Random;
/**
* Ensure that the DN reserves disk space equivalent to a full block for
* replica being written (RBW).
*/
public class TestRbwSpaceReservation {
static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class);
private static final short REPL_FACTOR = 1;
private static final int DU_REFRESH_INTERVAL_MSEC = 500;
private static final int STORAGES_PER_DATANODE = 1;
private static final int BLOCK_SIZE = 1024 * 1024;
private static final int SMALL_BLOCK_SIZE = 1024;
protected MiniDFSCluster cluster;
private Configuration conf;
private DistributedFileSystem fs = null;
private DFSClient client = null;
FsVolumeImpl singletonVolume = null;
private static Random rand = new Random();
private void initConfig(int blockSize) {
conf = new HdfsConfiguration();
// Refresh disk usage information frequently.
conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC);
conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
// Disable the scanner
conf.setInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
}
static {
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
}
private void startCluster(int blockSize, long perVolumeCapacity) throws IOException {
initConfig(blockSize);
cluster = new MiniDFSCluster
.Builder(conf)
.storagesPerDatanode(STORAGES_PER_DATANODE)
.numDataNodes(REPL_FACTOR)
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
cluster.waitActive();
if (perVolumeCapacity >= 0) {
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
assertThat(volumes.size(), is(1));
singletonVolume = ((FsVolumeImpl) volumes.get(0));
singletonVolume.setCapacityForTesting(perVolumeCapacity);
}
}
@After
public void shutdownCluster() throws IOException {
if (client != null) {
client.close();
client = null;
}
if (fs != null) {
fs.close();
fs = null;
}
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
private void createFileAndTestSpaceReservation(
final String fileNamePrefix, final int fileBlockSize)
throws IOException, InterruptedException {
// Enough for 1 block + meta files + some delta.
final long configuredCapacity = fileBlockSize * 2 - 1;
startCluster(BLOCK_SIZE, configuredCapacity);
FSDataOutputStream out = null;
Path path = new Path("/" + fileNamePrefix + ".dat");
try {
out = fs.create(path, false, 4096, (short) 1, fileBlockSize);
byte[] buffer = new byte[rand.nextInt(fileBlockSize / 4)];
out.write(buffer);
out.hsync();
int bytesWritten = buffer.length;
// Check that space was reserved for a full block minus the bytesWritten.
assertThat(singletonVolume.getReservedForRbw(),
is((long) fileBlockSize - bytesWritten));
out.close();
out = null;
// Check that the reserved space has been released since we closed the
// file.
assertThat(singletonVolume.getReservedForRbw(), is(0L));
// Reopen the file for appends and write 1 more byte.
out = fs.append(path);
out.write(buffer);
out.hsync();
bytesWritten += buffer.length;
// Check that space was again reserved for a full block minus the
// bytesWritten so far.
assertThat(singletonVolume.getReservedForRbw(),
is((long) fileBlockSize - bytesWritten));
// Write once again and again verify the available space. This ensures
// that the reserved space is progressively adjusted to account for bytes
// written to disk.
out.write(buffer);
out.hsync();
bytesWritten += buffer.length;
assertThat(singletonVolume.getReservedForRbw(),
is((long) fileBlockSize - bytesWritten));
} finally {
if (out != null) {
out.close();
}
}
}
@Test (timeout=300000)
public void testWithDefaultBlockSize()
throws IOException, InterruptedException {
createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE);
}
@Test (timeout=300000)
public void testWithNonDefaultBlockSize()
throws IOException, InterruptedException {
// Same test as previous one, but with a non-default block size.
createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE * 2);
}
/**
* Stress test to ensure we are not leaking reserved space.
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=600000)
public void stressTest() throws IOException, InterruptedException {
final int numWriters = 5;
startCluster(SMALL_BLOCK_SIZE, SMALL_BLOCK_SIZE * numWriters * 10);
Writer[] writers = new Writer[numWriters];
// Start a few writers and let them run for a while.
for (int i = 0; i < numWriters; ++i) {
writers[i] = new Writer(client, SMALL_BLOCK_SIZE);
writers[i].start();
}
Thread.sleep(60000);
// Stop the writers.
for (Writer w : writers) {
w.stopWriter();
}
int filesCreated = 0;
int numFailures = 0;
for (Writer w : writers) {
w.join();
filesCreated += w.getFilesCreated();
numFailures += w.getNumFailures();
}
LOG.info("Stress test created " + filesCreated +
" files and hit " + numFailures + " failures");
// Check no space was leaked.
assertThat(singletonVolume.getReservedForRbw(), is(0L));
}
private static class Writer extends Daemon {
private volatile boolean keepRunning;
private final DFSClient localClient;
private int filesCreated = 0;
private int numFailures = 0;
byte[] data;
Writer(DFSClient client, int blockSize) throws IOException {
localClient = client;
keepRunning = true;
filesCreated = 0;
numFailures = 0;
// At least some of the files should span a block boundary.
data = new byte[blockSize * 2];
}
@Override
public void run() {
/**
* Create a file, write up to 3 blocks of data and close the file.
* Do this in a loop until we are told to stop.
*/
while (keepRunning) {
OutputStream os = null;
try {
String filename = "/file-" + rand.nextLong();
os = localClient.create(filename, false);
os.write(data, 0, rand.nextInt(data.length));
IOUtils.closeQuietly(os);
os = null;
localClient.delete(filename, false);
Thread.sleep(50); // Sleep for a bit to avoid killing the system.
++filesCreated;
} catch (IOException ioe) {
// Just ignore the exception and keep going.
++numFailures;
} catch (InterruptedException ie) {
return;
} finally {
if (os != null) {
IOUtils.closeQuietly(os);
}
}
}
}
public void stopWriter() {
keepRunning = false;
}
public int getFilesCreated() {
return filesCreated;
}
public int getNumFailures() {
return numFailures;
}
}
}

View File

@ -158,7 +158,7 @@ public class TestWriteToReplica {
replicasMap.add(bpid, new ReplicaInPipeline(
blocks[TEMPORARY].getBlockId(),
blocks[TEMPORARY].getGenerationStamp(), vol,
vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock()).getParentFile()));
vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock()).getParentFile(), 0));
replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol,
vol.createRbwFile(bpid, blocks[RBW].getLocalBlock()).getParentFile(), null);