HDFS-8157. Writes to RAM DISK reserve locked memory for block files. (Arpit Agarwal)

This commit is contained in:
Arpit Agarwal 2015-05-16 09:05:35 -07:00
parent b0ad644083
commit e453989a57
20 changed files with 497 additions and 82 deletions

View File

@ -560,6 +560,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8394. Move getAdditionalBlock() and related functionalities into a
separate class. (wheat9)
HDFS-8157. Writes to RAM DISK reserve locked memory for block files.
(Arpit Agarwal)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -51,6 +51,7 @@ public class ReplicaInPipeline extends ReplicaInfo
* the bytes already written to this block.
*/
private long bytesReserved;
private final long originalBytesReserved;
/**
* Constructor for a zero length replica
@ -97,6 +98,7 @@ public class ReplicaInPipeline extends ReplicaInfo
this.bytesOnDisk = len;
this.writer = writer;
this.bytesReserved = bytesToReserve;
this.originalBytesReserved = bytesToReserve;
}
/**
@ -109,6 +111,7 @@ public class ReplicaInPipeline extends ReplicaInfo
this.bytesOnDisk = from.getBytesOnDisk();
this.writer = from.writer;
this.bytesReserved = from.bytesReserved;
this.originalBytesReserved = from.originalBytesReserved;
}
@Override
@ -148,9 +151,15 @@ public class ReplicaInPipeline extends ReplicaInfo
return bytesReserved;
}
@Override
public long getOriginalBytesReserved() {
return originalBytesReserved;
}
@Override
public void releaseAllBytesReserved() { // ReplicaInPipelineInterface
getVolume().releaseReservedSpace(bytesReserved);
getVolume().releaseLockedMemory(bytesReserved);
bytesReserved = 0;
}

View File

@ -219,6 +219,16 @@ abstract public class ReplicaInfo extends Block implements Replica {
return 0;
}
/**
* Number of bytes originally reserved for this replica. The actual
* reservation is adjusted as data is written to disk.
*
* @return the number of bytes originally reserved for this replica.
*/
public long getOriginalBytesReserved() {
return 0;
}
/**
* Copy specified file into a temporary file. Then rename the
* temporary file to the original name. This will cause any

View File

@ -72,6 +72,14 @@ public interface FsVolumeSpi {
*/
public void releaseReservedSpace(long bytesToRelease);
/**
* Release reserved memory for an RBW block written to transient storage
* i.e. RAM.
* bytesToRelease will be rounded down to the OS page size since locked
* memory reservation must always be a multiple of the page size.
*/
public void releaseLockedMemory(long bytesToRelease);
/**
* BlockIterator will return ExtendedBlock entries from a block pool in
* this volume. The entries will be returned in sorted order.<p/>

View File

@ -475,7 +475,7 @@ class BlockPoolSlice {
// eventually.
if (newReplica.getVolume().isTransientStorage()) {
lazyWriteReplicaMap.addReplica(bpid, blockId,
(FsVolumeImpl) newReplica.getVolume());
(FsVolumeImpl) newReplica.getVolume(), 0);
} else {
lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
import java.io.FileDescriptor;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -277,7 +276,8 @@ class FsDatasetAsyncDiskService {
@Override
public void run() {
long dfsBytes = blockFile.length() + metaFile.length();
final long blockLength = blockFile.length();
final long metaLength = metaFile.length();
boolean result;
result = (trashDirectory == null) ? deleteFiles() : moveFiles();
@ -291,7 +291,8 @@ class FsDatasetAsyncDiskService {
if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
}
volume.decDfsUsed(block.getBlockPoolId(), dfsBytes);
volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength);
volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength);
LOG.info("Deleted " + block.getBlockPoolId() + " "
+ block.getLocalBlock() + " file " + blockFile);
}

View File

@ -151,10 +151,15 @@ public class FsDatasetCache {
/**
* Round up a number to the operating system page size.
*/
public long round(long count) {
long newCount =
(count + (osPageSize - 1)) / osPageSize;
return newCount * osPageSize;
public long roundUp(long count) {
return (count + osPageSize - 1) & (~(osPageSize - 1));
}
/**
* Round down a number to the operating system page size.
*/
public long roundDown(long count) {
return count & (~(osPageSize - 1));
}
}
@ -173,7 +178,7 @@ public class FsDatasetCache {
* -1 if we failed.
*/
long reserve(long count) {
count = rounder.round(count);
count = rounder.roundUp(count);
while (true) {
long cur = usedBytes.get();
long next = cur + count;
@ -195,7 +200,20 @@ public class FsDatasetCache {
* @return The new number of usedBytes.
*/
long release(long count) {
count = rounder.round(count);
count = rounder.roundUp(count);
return usedBytes.addAndGet(-count);
}
/**
* Release some bytes that we're using rounded down to the page size.
*
* @param count The number of bytes to release. We will round this
* down to the page size.
*
* @return The new number of usedBytes.
*/
long releaseRoundDown(long count) {
count = rounder.roundDown(count);
return usedBytes.addAndGet(-count);
}
@ -340,6 +358,52 @@ public class FsDatasetCache {
}
}
/**
* Try to reserve more bytes.
*
* @param count The number of bytes to add. We will round this
* up to the page size.
*
* @return The new number of usedBytes if we succeeded;
* -1 if we failed.
*/
long reserve(long count) {
return usedBytesCount.reserve(count);
}
/**
* Release some bytes that we're using.
*
* @param count The number of bytes to release. We will round this
* up to the page size.
*
* @return The new number of usedBytes.
*/
long release(long count) {
return usedBytesCount.release(count);
}
/**
* Release some bytes that we're using rounded down to the page size.
*
* @param count The number of bytes to release. We will round this
* down to the page size.
*
* @return The new number of usedBytes.
*/
long releaseRoundDown(long count) {
return usedBytesCount.releaseRoundDown(count);
}
/**
* Get the OS page size.
*
* @return the OS page size.
*/
long getOsPageSize() {
return usedBytesCount.rounder.osPageSize;
}
/**
* Background worker that mmaps, mlocks, and checksums a block
*/
@ -363,7 +427,7 @@ public class FsDatasetCache {
MappableBlock mappableBlock = null;
ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
key.getBlockId(), length, genstamp);
long newUsedBytes = usedBytesCount.reserve(length);
long newUsedBytes = reserve(length);
boolean reservedBytes = false;
try {
if (newUsedBytes < 0) {
@ -423,7 +487,7 @@ public class FsDatasetCache {
IOUtils.closeQuietly(metaIn);
if (!success) {
if (reservedBytes) {
usedBytesCount.release(length);
release(length);
}
LOG.debug("Caching of {} was aborted. We are now caching only {} "
+ "bytes in total.", key, usedBytesCount.get());
@ -502,8 +566,7 @@ public class FsDatasetCache {
synchronized (FsDatasetCache.this) {
mappableBlockMap.remove(key);
}
long newUsedBytes =
usedBytesCount.release(value.mappableBlock.getLength());
long newUsedBytes = release(value.mappableBlock.getLength());
numBlocksCached.addAndGet(-1);
dataset.datanode.getMetrics().incrBlocksUncached(1);
if (revocationTimeMs != 0) {

View File

@ -319,8 +319,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
cacheManager = new FsDatasetCache(this);
// Start the lazy writer once we have built the replica maps.
// We need to start the lazy writer even if MaxLockedMemory is set to
// zero because we may have un-persisted replicas in memory from before
// the process restart. To minimize the chances of data loss we'll
// ensure they get written to disk now.
if (ramDiskReplicaTracker.numReplicasNotPersisted() > 0 ||
datanode.getDnConf().getMaxLockedMemory() > 0) {
lazyWriter = new Daemon(new LazyWriter(conf));
lazyWriter.start();
} else {
lazyWriter = null;
}
registerMBean(datanode.getDatanodeUuid());
// Add a Metrics2 Source Interface. This is same
@ -1284,26 +1294,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
" and thus cannot be created.");
}
// create a new block
FsVolumeReference ref;
while (true) {
FsVolumeReference ref = null;
// Use ramdisk only if block size is a multiple of OS page size.
// This simplifies reservation for partially used replicas
// significantly.
if (allowLazyPersist &&
lazyWriter != null &&
b.getNumBytes() % cacheManager.getOsPageSize() == 0 &&
(cacheManager.reserve(b.getNumBytes())) > 0) {
try {
if (allowLazyPersist) {
// First try to place the block on a transient volume.
ref = volumes.getNextTransientVolume(b.getNumBytes());
datanode.getMetrics().incrRamDiskBlocksWrite();
} else {
} catch(DiskOutOfSpaceException de) {
// Ignore the exception since we just fall back to persistent storage.
datanode.getMetrics().incrRamDiskBlocksWriteFallback();
} finally {
if (ref == null) {
cacheManager.release(b.getNumBytes());
}
}
}
if (ref == null) {
ref = volumes.getNextVolume(storageType, b.getNumBytes());
}
} catch (DiskOutOfSpaceException de) {
if (allowLazyPersist) {
datanode.getMetrics().incrRamDiskBlocksWriteFallback();
allowLazyPersist = false;
continue;
}
throw de;
}
break;
}
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
// create an rbw file to hold block in the designated volume
File f;
@ -1564,7 +1581,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
if (v.isTransientStorage()) {
ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
releaseLockedMemory(
replicaInfo.getOriginalBytesReserved() - replicaInfo.getNumBytes(),
false);
ramDiskReplicaTracker.addReplica(
bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes());
datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes());
}
}
@ -1811,9 +1832,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
/**
* We're informed that a block is no longer valid. We
* could lazily garbage-collect the block, but why bother?
* just get rid of it.
* We're informed that a block is no longer valid. Delete it.
*/
@Override // FsDatasetSpi
public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
@ -2064,8 +2083,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public void shutdown() {
fsRunning = false;
if (lazyWriter != null) {
((LazyWriter) lazyWriter.getRunnable()).stop();
lazyWriter.interrupt();
}
if (mbeanName != null) {
MBeans.unregister(mbeanName);
@ -2083,6 +2104,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
volumes.shutdown();
}
if (lazyWriter != null) {
try {
lazyWriter.join();
} catch (InterruptedException ie) {
@ -2090,6 +2112,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
"from LazyWriter.join");
}
}
}
@Override // FSDatasetMBean
public String getStorageInfo() {
@ -2173,7 +2196,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
diskFile.length(), diskGS, vol, diskFile.getParentFile());
volumeMap.add(bpid, diskBlockInfo);
if (vol.isTransientStorage()) {
ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
long lockedBytesReserved =
cacheManager.reserve(diskBlockInfo.getNumBytes()) > 0 ?
diskBlockInfo.getNumBytes() : 0;
ramDiskReplicaTracker.addReplica(
bpid, blockId, (FsVolumeImpl) vol, lockedBytesReserved);
}
LOG.warn("Added missing block to memory " + diskBlockInfo);
return;
@ -2760,12 +2787,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
boolean ramDiskConfigured = ramDiskConfigured();
// Add thread for DISK volume if RamDisk is configured
if (ramDiskConfigured &&
asyncLazyPersistService != null &&
!asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
asyncLazyPersistService.addVolume(v.getCurrentDir());
}
// Remove thread for DISK volume if RamDisk is not configured
if (!ramDiskConfigured &&
asyncLazyPersistService != null &&
asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
asyncLazyPersistService.removeVolume(v.getCurrentDir());
}
@ -2790,9 +2819,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Remove the old replicas
if (blockFile.delete() || !blockFile.exists()) {
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed);
FsVolumeImpl volume = (FsVolumeImpl) replicaInfo.getVolume();
volume.onBlockFileDeletion(bpid, blockFileUsed);
if (metaFile.delete() || !metaFile.exists()) {
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed);
volume.onMetaFileDeletion(bpid, metaFileUsed);
}
}
@ -2905,8 +2935,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
/**
* Attempt to evict one or more transient block replicas we have at least
* spaceNeeded bytes free.
* Attempt to evict one or more transient block replicas until we
* have at least spaceNeeded bytes free.
*/
private void evictBlocks() throws IOException {
int iterations = 0;
@ -3056,5 +3086,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
s.add(blockId);
}
}
void releaseLockedMemory(long count, boolean roundup) {
if (roundup) {
cacheManager.release(count);
} else {
cacheManager.releaseRoundDown(count);
}
}
}

View File

@ -274,7 +274,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
return getBlockPoolSlice(bpid).getTmpDir();
}
void decDfsUsed(String bpid, long value) {
void onBlockFileDeletion(String bpid, long value) {
decDfsUsed(bpid, value);
if (isTransientStorage()) {
dataset.releaseLockedMemory(value, true);
}
}
void onMetaFileDeletion(String bpid, long value) {
decDfsUsed(bpid, value);
}
private void decDfsUsed(String bpid, long value) {
synchronized(dataset) {
BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) {
@ -428,6 +439,13 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
}
@Override
public void releaseLockedMemory(long bytesToRelease) {
if (isTransientStorage()) {
dataset.releaseLockedMemory(bytesToRelease, false);
}
}
private enum SubdirFilter implements FilenameFilter {
INSTANCE;

View File

@ -38,8 +38,10 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
private class RamDiskReplicaLru extends RamDiskReplica {
long lastUsedTime;
private RamDiskReplicaLru(String bpid, long blockId, FsVolumeImpl ramDiskVolume) {
super(bpid, blockId, ramDiskVolume);
private RamDiskReplicaLru(String bpid, long blockId,
FsVolumeImpl ramDiskVolume,
long lockedBytesReserved) {
super(bpid, blockId, ramDiskVolume, lockedBytesReserved);
}
@Override
@ -70,20 +72,23 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
TreeMultimap<Long, RamDiskReplicaLru> replicasPersisted;
RamDiskReplicaLruTracker() {
replicaMaps = new HashMap<String, Map<Long, RamDiskReplicaLru>>();
replicasNotPersisted = new LinkedList<RamDiskReplicaLru>();
replicaMaps = new HashMap<>();
replicasNotPersisted = new LinkedList<>();
replicasPersisted = TreeMultimap.create();
}
@Override
synchronized void addReplica(final String bpid, final long blockId,
final FsVolumeImpl transientVolume) {
final FsVolumeImpl transientVolume,
long lockedBytesReserved) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
if (map == null) {
map = new HashMap<Long, RamDiskReplicaLru>();
map = new HashMap<>();
replicaMaps.put(bpid, map);
}
RamDiskReplicaLru ramDiskReplicaLru = new RamDiskReplicaLru(bpid, blockId, transientVolume);
RamDiskReplicaLru ramDiskReplicaLru =
new RamDiskReplicaLru(bpid, blockId, transientVolume,
lockedBytesReserved);
map.put(blockId, ramDiskReplicaLru);
replicasNotPersisted.add(ramDiskReplicaLru);
}

View File

@ -45,6 +45,7 @@ public abstract class RamDiskReplicaTracker {
private final long blockId;
private File savedBlockFile;
private File savedMetaFile;
private long lockedBytesReserved;
private long creationTime;
protected AtomicLong numReads = new AtomicLong(0);
@ -61,10 +62,12 @@ public abstract class RamDiskReplicaTracker {
FsVolumeImpl lazyPersistVolume;
RamDiskReplica(final String bpid, final long blockId,
final FsVolumeImpl ramDiskVolume) {
final FsVolumeImpl ramDiskVolume,
long lockedBytesReserved) {
this.bpid = bpid;
this.blockId = blockId;
this.ramDiskVolume = ramDiskVolume;
this.lockedBytesReserved = lockedBytesReserved;
lazyPersistVolume = null;
savedMetaFile = null;
savedBlockFile = null;
@ -168,6 +171,10 @@ public abstract class RamDiskReplicaTracker {
public String toString() {
return "[BlockPoolID=" + bpid + "; BlockId=" + blockId + "]";
}
public long getLockedBytesReserved() {
return lockedBytesReserved;
}
}
/**
@ -201,7 +208,8 @@ public abstract class RamDiskReplicaTracker {
* @param transientVolume RAM disk volume that stores the replica.
*/
abstract void addReplica(final String bpid, final long blockId,
final FsVolumeImpl transientVolume);
final FsVolumeImpl transientVolume,
long lockedBytesReserved);
/**
* Invoked when a replica is opened by a client. This may be used as

View File

@ -1582,7 +1582,7 @@ public class MiniDFSCluster {
throw new IllegalStateException("Attempting to finalize "
+ "Namenode but it is not running");
}
ToolRunner.run(new DFSAdmin(conf), new String[] {"-finalizeUpgrade"});
ToolRunner.run(new DFSAdmin(conf), new String[]{"-finalizeUpgrade"});
}
/**

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
@ -120,13 +121,16 @@ public class TestBalancer {
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
}
static void initConfWithRamDisk(Configuration conf) {
static void initConfWithRamDisk(Configuration conf,
long ramDiskCapacity) {
conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE);
conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, ramDiskCapacity);
conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, DEFAULT_RAM_DISK_BLOCK_SIZE);
LazyPersistTestCase.initCacheManipulator();
}
/* create a file with a length of <code>fileLen</code> */
@ -1245,7 +1249,6 @@ public class TestBalancer {
final int SEED = 0xFADED;
final short REPL_FACT = 1;
Configuration conf = new Configuration();
initConfWithRamDisk(conf);
final int defaultRamDiskCapacity = 10;
final long ramDiskStorageLimit =
@ -1255,6 +1258,8 @@ public class TestBalancer {
((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
(DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
initConfWithRamDisk(conf, ramDiskStorageLimit);
cluster = new MiniDFSCluster
.Builder(conf)
.numDataNodes(1)

View File

@ -491,6 +491,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public void reserveSpaceForRbw(long bytesToReserve) {
}
@Override
public void releaseLockedMemory(long bytesToRelease) {
}
@Override
public void releaseReservedSpace(long bytesToRelease) {
}

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
@ -79,6 +80,8 @@ public class TestDirectoryScanner {
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_LENGTH);
CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
CONF.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
Long.MAX_VALUE);
}
/** create a file with a length of <code>fileLen</code> */
@ -308,6 +311,7 @@ public class TestDirectoryScanner {
@Test (timeout=300000)
public void testRetainBlockOnPersistentStorage() throws Exception {
LazyPersistTestCase.initCacheManipulator();
cluster = new MiniDFSCluster
.Builder(CONF)
.storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
@ -349,6 +353,7 @@ public class TestDirectoryScanner {
@Test (timeout=300000)
public void testDeleteBlockOnTransientStorage() throws Exception {
LazyPersistTestCase.initCacheManipulator();
cluster = new MiniDFSCluster
.Builder(CONF)
.storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
@ -614,6 +619,10 @@ public class TestDirectoryScanner {
public void releaseReservedSpace(long bytesToRelease) {
}
@Override
public void releaseLockedMemory(long bytesToRelease) {
}
@Override
public BlockIterator newBlockIterator(String bpid, String name) {
throw new UnsupportedOperationException();

View File

@ -339,7 +339,7 @@ public class TestFsDatasetCache {
for (int i=0; i<numFiles-1; i++) {
setHeartbeatResponse(cacheBlocks(fileLocs[i]));
total = DFSTestUtil.verifyExpectedCacheUsage(
rounder.round(total + fileSizes[i]), 4 * (i + 1), fsd);
rounder.roundUp(total + fileSizes[i]), 4 * (i + 1), fsd);
}
// nth file should hit a capacity exception
@ -365,7 +365,7 @@ public class TestFsDatasetCache {
int curCachedBlocks = 16;
for (int i=0; i<numFiles-1; i++) {
setHeartbeatResponse(uncacheBlocks(fileLocs[i]));
long uncachedBytes = rounder.round(fileSizes[i]);
long uncachedBytes = rounder.roundUp(fileSizes[i]);
total -= uncachedBytes;
curCachedBlocks -= uncachedBytes / BLOCK_SIZE;
DFSTestUtil.verifyExpectedCacheUsage(total, curCachedBlocks, fsd);

View File

@ -81,6 +81,10 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
public void releaseReservedSpace(long bytesToRelease) {
}
@Override
public void releaseLockedMemory(long bytesToRelease) {
}
@Override
public BlockIterator newBlockIterator(String bpid, String name) {
return null;

View File

@ -23,16 +23,7 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.fs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@ -40,6 +31,7 @@ import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet;
@ -68,6 +60,7 @@ import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.tools.JMXGet;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
@ -80,8 +73,8 @@ public abstract class LazyPersistTestCase {
static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
static {
DFSTestUtil.setNameNodeLogLevel(Level.ALL);
GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.ALL);
DFSTestUtil.setNameNodeLogLevel(Level.DEBUG);
GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.DEBUG);
}
protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
@ -95,6 +88,8 @@ public abstract class LazyPersistTestCase {
protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
protected static final Log LOG = LogFactory.getLog(LazyPersistTestCase.class);
protected static final short REPL_FACTOR = 1;
protected final long osPageSize =
NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
protected MiniDFSCluster cluster;
protected DistributedFileSystem fs;
@ -242,10 +237,12 @@ public abstract class LazyPersistTestCase {
int ramDiskReplicaCapacity,
long ramDiskStorageLimit,
long evictionLowWatermarkReplicas,
long maxLockedMemory,
boolean useSCR,
boolean useLegacyBlockReaderLocal,
boolean disableScrubber) throws IOException {
initCacheManipulator();
Configuration conf = new Configuration();
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
if (disableScrubber) {
@ -262,6 +259,7 @@ public abstract class LazyPersistTestCase {
conf.setLong(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
evictionLowWatermarkReplicas * BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, maxLockedMemory);
if (useSCR) {
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
@ -311,6 +309,31 @@ public abstract class LazyPersistTestCase {
LOG.info("Cluster startup complete");
}
/**
* Use a dummy cache manipulator for testing.
*/
public static void initCacheManipulator() {
NativeIO.POSIX.setCacheManipulator(new NativeIO.POSIX.CacheManipulator() {
@Override
public void mlock(String identifier,
ByteBuffer mmap, long length) throws IOException {
LOG.info("LazyPersistTestCase: faking mlock of " + identifier + " bytes.");
}
@Override
public long getMemlockLimit() {
LOG.info("LazyPersistTestCase: fake return " + Long.MAX_VALUE);
return Long.MAX_VALUE;
}
@Override
public boolean verifyCanMlock() {
LOG.info("LazyPersistTestCase: fake return " + true);
return true;
}
});
}
ClusterWithRamDiskBuilder getClusterBuilder() {
return new ClusterWithRamDiskBuilder();
}
@ -344,6 +367,11 @@ public abstract class LazyPersistTestCase {
return this;
}
public ClusterWithRamDiskBuilder setMaxLockedMemory(long maxLockedMemory) {
this.maxLockedMemory = maxLockedMemory;
return this;
}
public ClusterWithRamDiskBuilder setUseScr(boolean useScr) {
this.useScr = useScr;
return this;
@ -376,13 +404,14 @@ public abstract class LazyPersistTestCase {
LazyPersistTestCase.this.startUpCluster(
numDatanodes, hasTransientStorage, storageTypes, ramDiskReplicaCapacity,
ramDiskStorageLimit, evictionLowWatermarkReplicas,
useScr, useLegacyBlockReaderLocal,disableScrubber);
maxLockedMemory, useScr, useLegacyBlockReaderLocal, disableScrubber);
}
private int numDatanodes = REPL_FACTOR;
private StorageType[] storageTypes = null;
private int ramDiskReplicaCapacity = -1;
private long ramDiskStorageLimit = -1;
private long maxLockedMemory = Long.MAX_VALUE;
private boolean hasTransientStorage = true;
private boolean useScr = false;
private boolean useLegacyBlockReaderLocal = false;

View File

@ -0,0 +1,201 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Supplier;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.junit.Test;
import java.io.IOException;
import java.util.EnumSet;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.fs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
/**
* Verify that locked memory is used correctly when writing to replicas in
* memory
*/
public class TestLazyPersistLockedMemory extends LazyPersistTestCase {
/**
* RAM disk present but locked memory is set to zero. Placement should
* fall back to disk.
*/
@Test
public void testWithNoLockedMemory() throws IOException {
getClusterBuilder().setNumDatanodes(1)
.setMaxLockedMemory(0).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path, DEFAULT);
}
@Test
public void testReservation()
throws IOException, TimeoutException, InterruptedException {
getClusterBuilder().setNumDatanodes(1)
.setMaxLockedMemory(BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
// Create a file and ensure the replica in RAM_DISK uses locked memory.
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path, RAM_DISK);
assertThat(fsd.getCacheUsed(), is((long) BLOCK_SIZE));
}
@Test
public void testReleaseOnFileDeletion()
throws IOException, TimeoutException, InterruptedException {
getClusterBuilder().setNumDatanodes(1)
.setMaxLockedMemory(BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path, RAM_DISK);
assertThat(fsd.getCacheUsed(), is((long) BLOCK_SIZE));
// Delete the file and ensure that the locked memory is released.
fs.delete(path, false);
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
waitForLockedBytesUsed(fsd, 0);
}
/**
* Verify that locked RAM is released when blocks are evicted from RAM disk.
*/
@Test
public void testReleaseOnEviction()
throws IOException, TimeoutException, InterruptedException {
getClusterBuilder().setNumDatanodes(1)
.setMaxLockedMemory(BLOCK_SIZE)
.setRamDiskReplicaCapacity(BLOCK_SIZE * 2 - 1)
.build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
// The block should get evicted soon since it pushes RAM disk free
// space below the threshold.
waitForLockedBytesUsed(fsd, 0);
MetricsRecordBuilder rb =
MetricsAsserts.getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
MetricsAsserts.assertCounter("RamDiskBlocksEvicted", 1L, rb);
}
/**
* Verify that locked bytes are correctly updated when a block is finalized
* at less than its max length.
*/
@Test
public void testShortBlockFinalized()
throws IOException, TimeoutException, InterruptedException {
getClusterBuilder().setNumDatanodes(1).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, 1, true);
assertThat(fsd.getCacheUsed(), is(osPageSize));
// Delete the file and ensure locked RAM usage goes to zero.
fs.delete(path, false);
waitForLockedBytesUsed(fsd, 0);
}
/**
* Verify that locked bytes are correctly updated when the client goes
* away unexpectedly during a write.
*/
@Test
public void testWritePipelineFailure()
throws IOException, TimeoutException, InterruptedException {
getClusterBuilder().setNumDatanodes(1).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
Path path = new Path("/" + METHOD_NAME + ".dat");
EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE, LAZY_PERSIST);
// Write 1 byte to the file and kill the writer.
final FSDataOutputStream fos =
fs.create(path,
FsPermission.getFileDefault(),
createFlags,
BUFFER_LENGTH,
REPL_FACTOR,
BLOCK_SIZE,
null);
fos.write(new byte[1]);
fos.hsync();
DFSTestUtil.abortStream((DFSOutputStream) fos.getWrappedStream());
waitForLockedBytesUsed(fsd, osPageSize);
// Delete the file and ensure locked RAM goes to zero.
fs.delete(path, false);
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
waitForLockedBytesUsed(fsd, 0);
}
/**
* Wait until used locked byte count goes to the expected value.
* @throws TimeoutException after 300 seconds.
*/
private void waitForLockedBytesUsed(final FsDatasetSpi<?> fsd,
final long expectedLockedBytes)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
long cacheUsed = fsd.getCacheUsed();
LOG.info("cacheUsed=" + cacheUsed + ", waiting for it to be " + expectedLockedBytes);
if (cacheUsed < 0) {
throw new IllegalStateException("cacheUsed unpexpectedly negative");
}
return (cacheUsed == expectedLockedBytes);
}
}, 1000, 300000);
}
}

View File

@ -204,7 +204,7 @@ public class TestWriteToReplica {
long available = v.getCapacity()-v.getDfsUsed();
long expectedLen = blocks[FINALIZED].getNumBytes();
try {
v.decDfsUsed(bpid, -available);
v.onBlockFileDeletion(bpid, -available);
blocks[FINALIZED].setNumBytes(expectedLen+100);
dataSet.append(blocks[FINALIZED], newGS, expectedLen);
Assert.fail("Should not have space to append to an RWR replica" + blocks[RWR]);
@ -212,7 +212,7 @@ public class TestWriteToReplica {
Assert.assertTrue(e.getMessage().startsWith(
"Insufficient space for appending to "));
}
v.decDfsUsed(bpid, available);
v.onBlockFileDeletion(bpid, available);
blocks[FINALIZED].setNumBytes(expectedLen);
newGS = blocks[RBW].getGenerationStamp()+1;