HDFS-7112. LazyWriter should use either async IO or one thread per physical disk. Contributed by Xiaoyu Yao.

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
This commit is contained in:
cnauroth 2014-10-07 20:25:19 -07:00 committed by Jitendra Pandey
parent ae8c9cdb18
commit 69aacf19c1
8 changed files with 506 additions and 85 deletions

View File

@ -619,6 +619,101 @@ Release 2.6.0 - UNRELEASED
HDFS-5089. When a LayoutVersion support SNAPSHOT, it must support HDFS-5089. When a LayoutVersion support SNAPSHOT, it must support
FSIMAGE_NAME_OPTIMIZATION. (szetszwo) FSIMAGE_NAME_OPTIMIZATION. (szetszwo)
BREAKDOWN OF HDFS-6581 SUBTASKS AND RELATED JIRAS
HDFS-6921. Add LazyPersist flag to FileStatus. (Arpit Agarwal)
HDFS-6924. Add new RAM_DISK storage type. (Arpit Agarwal)
HDFS-6922. Add LazyPersist flag to INodeFile, save it in FsImage and
edit logs. (Arpit Agarwal)
HDFS-6923. Propagate LazyPersist flag to DNs via DataTransferProtocol.
(Arpit Agarwal)
HDFS-6925. DataNode should attempt to place replicas on transient storage
first if lazyPersist flag is received. (Arpit Agarwal)
HDFS-6926. DN support for saving replicas to persistent storage and
evicting in-memory replicas. (Arpit Agarwal)
HDFS-6927. Initial unit tests for lazy persist files. (Arpit Agarwal)
HDFS-6929. NN periodically unlinks lazy persist files with missing
replicas from namespace. (Arpit Agarwal)
HDFS-6928. 'hdfs put' command should accept lazyPersist flag for testing.
(Arpit Agarwal)
HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring.
(Arpit Agarwal)
HDFS-6931. Move lazily persisted replicas to finalized directory on DN
startup. (Arpit Agarwal)
HDFS-6950. Add Additional unit tests for HDFS-6581. (Xiaoyu Yao via
Arpit Agarwal)
HDFS-6930. Improve replica eviction from RAM disk. (Arpit Agarwal)
HDFS-6977. Delete all copies when a block is deleted from the block space.
(Arpit Agarwal)
HDFS-6991. Notify NN of evicted block before deleting it from RAM disk.
(Arpit Agarwal)
HDFS-6978. Directory scanner should correctly reconcile blocks on RAM
disk. (Arpit Agarwal)
HDFS-7066. LazyWriter#evictBlocks misses a null check for replicaState.
(Xiaoyu Yao via Arpit Agarwal)
HDFS-7064. Fix unit test failures in HDFS-6581 branch. (Xiaoyu Yao via
Arpit Agarwal)
HDFS-6581. Few more unit test fixes for HDFS-6581. (Arpit Agarwal)
HDFS-7080. Fix finalize and upgrade unit test failures. (Arpit Agarwal)
HDFS-7084. FsDatasetImpl#copyBlockFiles debug log can be improved.
(Xiaoyu Yao via Arpit Agarwal)
HDFS-7091. Add forwarding constructor for INodeFile for existing callers.
(Arpit Agarwal)
HDFS-7100. Make eviction scheme pluggable. (Arpit Agarwal)
HDFS-7108. Fix unit test failures in SimulatedFsDataset. (Arpit Agarwal)
HDFS-6990. Add unit test for evict/delete RAM_DISK block with open
handle. (Xiaoyu Yao via Arpit Agarwal)
HDFS-7143. Fix findbugs warnings in HDFS-6581 branch. (szetszwo via
Arpit Agarwal)
HDFS-6932. Balancer and Mover tools should ignore replicas on RAM_DISK.
(Xiaoyu Yao via Arpit Agarwal)
HDFS-7144. Fix findbugs warnings in RamDiskReplicaTracker. (szetszwo via
Arpit Agarwal)
HDFS-7155. Bugfix in createLocatedFileStatus caused by bad merge.
(Arpit Agarwal)
HDFS-7153. Add storagePolicy to NN edit log during file creation.
(Arpit Agarwal)
HDFS-7159. Use block storage policy to set lazy persist preference.
(Arpit Agarwal)
HDFS-7129. Metrics to track usage of memory for writes. (Xiaoyu Yao
via Arpit Agarwal)
HDFS-7171. Fix Jenkins failures in HDFS-6581 branch. (Arpit Agarwal)
HDFS-7112. LazyWriter should use either async IO or one thread per physical
disk. (Xiaoyu Yao via cnauroth)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -463,5 +464,15 @@ public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid,
public void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block, public void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block,
final FileDescriptor fd, final long offset, final long nbytes, final FileDescriptor fd, final long offset, final long nbytes,
final int flags); final int flags);
}
/**
* Callback from RamDiskAsyncLazyPersistService upon async lazy persist task end
*/
public void onCompleteLazyPersist(String bpId, long blockId,
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume);
/**
* Callback from RamDiskAsyncLazyPersistService upon async lazy persist task fail
*/
public void onFailLazyPersist(String bpId, long blockId);
}

View File

@ -171,6 +171,10 @@ void decDfsUsed(long value) {
long getDfsUsed() throws IOException { long getDfsUsed() throws IOException {
return dfsUsage.getUsed(); return dfsUsage.getUsed();
} }
void incDfsUsed(long value) {
dfsUsage.incDfsUsed(value);
}
/** /**
* Read in the cached DU value and return it if it is less than 600 seconds * Read in the cached DU value and return it if it is less than 600 seconds
@ -276,23 +280,6 @@ File addBlock(Block b, File f) throws IOException {
return blockFile; return blockFile;
} }
/**
* Save the given replica to persistent storage.
*
* @return The saved meta and block files, in that order.
* @throws IOException
*/
File[] lazyPersistReplica(long blockId, long genStamp,
File srcMeta, File srcFile) throws IOException {
if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
}
File targetFiles[] = FsDatasetImpl.copyBlockFiles(
blockId, genStamp, srcMeta, srcFile, lazypersistDir);
dfsUsage.incDfsUsed(targetFiles[0].length() + targetFiles[1].length());
return targetFiles;
}
/** /**
* Move a persisted replica from lazypersist directory to a subdirectory * Move a persisted replica from lazypersist directory to a subdirectory
* under finalized. * under finalized.

View File

@ -220,6 +220,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
final ReplicaMap volumeMap; final ReplicaMap volumeMap;
final RamDiskReplicaTracker ramDiskReplicaTracker; final RamDiskReplicaTracker ramDiskReplicaTracker;
final RamDiskAsyncLazyPersistService asyncLazyPersistService;
private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3; private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
@ -273,10 +274,12 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
VolumeChoosingPolicy.class), conf); VolumeChoosingPolicy.class), conf);
volumes = new FsVolumeList(volsFailed, blockChooserImpl); volumes = new FsVolumeList(volsFailed, blockChooserImpl);
asyncDiskService = new FsDatasetAsyncDiskService(datanode); asyncDiskService = new FsDatasetAsyncDiskService(datanode);
asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
addVolume(dataLocations, storage.getStorageDir(idx)); addVolume(dataLocations, storage.getStorageDir(idx));
} }
setupAsyncLazyPersistThreads();
cacheManager = new FsDatasetCache(this); cacheManager = new FsDatasetCache(this);
@ -409,6 +412,8 @@ public void run() {
} }
} }
setupAsyncLazyPersistThreads();
for (int i = 0; i < volumes.size(); i++) { for (int i = 0; i < volumes.size(); i++) {
if (successFlags[i]) { if (successFlags[i]) {
succeedVolumes.add(volumes.get(i)); succeedVolumes.add(volumes.get(i));
@ -462,6 +467,7 @@ public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
storageMap.remove(sd.getStorageUuid()); storageMap.remove(sd.getStorageUuid());
} }
} }
setupAsyncLazyPersistThreads();
} }
private StorageType getStorageTypeFromLocations( private StorageType getStorageTypeFromLocations(
@ -1506,10 +1512,11 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
RamDiskReplica replicaInfo = RamDiskReplica replicaInfo =
ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId()); ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId());
if (replicaInfo != null) { if (replicaInfo != null) {
if (replicaInfo.getIsPersisted() == false) { if (!replicaInfo.getIsPersisted()) {
datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted(); datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted();
} }
discardRamDiskReplica(replicaInfo, true); ramDiskReplicaTracker.discardReplica(replicaInfo.getBlockPoolId(),
replicaInfo.getBlockId(), true);
} }
} }
@ -1750,6 +1757,10 @@ public void shutdown() {
if (asyncDiskService != null) { if (asyncDiskService != null) {
asyncDiskService.shutdown(); asyncDiskService.shutdown();
} }
if (asyncLazyPersistService != null) {
asyncLazyPersistService.shutdown();
}
if(volumes != null) { if(volumes != null) {
volumes.shutdown(); volumes.shutdown();
@ -2308,6 +2319,40 @@ public RollingLogs createRollingLogs(String bpid, String prefix
return new RollingLogsImpl(dir, prefix); return new RollingLogsImpl(dir, prefix);
} }
@Override
public void onCompleteLazyPersist(String bpId, long blockId,
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
synchronized (FsDatasetImpl.this) {
ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
targetVolume.incDfsUsed(bpId,
savedFiles[0].length() + savedFiles[1].length());
// Update metrics (ignore the metadata file size)
datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
datanode.getMetrics().incrRamDiskBytesLazyPersisted(savedFiles[1].length());
datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs(
Time.monotonicNow() - creationTime);
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter: Finish persisting RamDisk block: "
+ " block pool Id: " + bpId + " block id: " + blockId
+ " to block file " + savedFiles[1] + " and meta file " + savedFiles[0]
+ " on target volume " + targetVolume);
}
}
}
@Override
public void onFailLazyPersist(String bpId, long blockId) {
RamDiskReplica block = null;
block = ramDiskReplicaTracker.getReplica(bpId, blockId);
if (block != null) {
LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
}
}
@Override @Override
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
FileDescriptor fd, long offset, long nbytes, int flags) { FileDescriptor fd, long offset, long nbytes, int flags) {
@ -2316,9 +2361,38 @@ public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
nbytes, flags); nbytes, flags);
} }
void discardRamDiskReplica(RamDiskReplica replica, boolean deleteSavedCopies) { private boolean ramDiskConfigured() {
ramDiskReplicaTracker.discardReplica(replica.getBlockPoolId(), for (FsVolumeImpl v: getVolumes()){
replica.getBlockId(), deleteSavedCopies); if (v.isTransientStorage()) {
return true;
}
}
return false;
}
// Add/Remove per DISK volume async lazy persist thread when RamDisk volume is
// added or removed.
// This should only be called when the FsDataSetImpl#volumes list is finalized.
private void setupAsyncLazyPersistThreads() {
boolean ramDiskConfigured = ramDiskConfigured();
for (FsVolumeImpl v: getVolumes()){
// Skip transient volumes
if (v.isTransientStorage()) {
continue;
}
// Add thread for DISK volume if RamDisk is configured
if (ramDiskConfigured &&
!asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
asyncLazyPersistService.addVolume(v.getCurrentDir());
}
// Remove thread for DISK volume if RamDisk is not configured
if (!ramDiskConfigured &&
asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
asyncLazyPersistService.removeVolume(v.getCurrentDir());
}
}
} }
class LazyWriter implements Runnable { class LazyWriter implements Runnable {
@ -2344,61 +2418,6 @@ public LazyWriter(Configuration conf) {
DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT); DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT);
} }
private void moveReplicaToNewVolume(String bpid, long blockId, long creationTime)
throws IOException {
FsVolumeImpl targetVolume;
ReplicaInfo replicaInfo;
BlockPoolSlice bpSlice;
File srcFile, srcMeta;
long genStamp;
synchronized (FsDatasetImpl.this) {
replicaInfo = volumeMap.get(bpid, blockId);
if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) {
// The block was either deleted before it could be checkpointed or
// it is already on persistent storage. This can occur if a second
// replica on persistent storage was found after the lazy write was
// scheduled.
return;
}
// Pick a target volume for the block.
targetVolume = volumes.getNextVolume(
StorageType.DEFAULT, replicaInfo.getNumBytes());
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
}
ramDiskReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
bpSlice = targetVolume.getBlockPoolSlice(bpid);
srcMeta = replicaInfo.getMetaFile();
srcFile = replicaInfo.getBlockFile();
genStamp = replicaInfo.getGenerationStamp();
}
// Drop the FsDatasetImpl lock for the file copy.
File[] savedFiles =
bpSlice.lazyPersistReplica(blockId, genStamp, srcMeta, srcFile);
synchronized (FsDatasetImpl.this) {
ramDiskReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
// Update metrics (ignore the metadata file size)
datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
datanode.getMetrics().incrRamDiskBytesLazyPersisted(replicaInfo.getNumBytes());
datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs(
Time.monotonicNow() - creationTime);
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
" to file " + savedFiles[1]);
}
}
}
/** /**
* Checkpoint a pending replica to persistent storage now. * Checkpoint a pending replica to persistent storage now.
* If we fail then move the replica to the end of the queue. * If we fail then move the replica to the end of the queue.
@ -2406,13 +2425,43 @@ private void moveReplicaToNewVolume(String bpid, long blockId, long creationTime
*/ */
private boolean saveNextReplica() { private boolean saveNextReplica() {
RamDiskReplica block = null; RamDiskReplica block = null;
FsVolumeImpl targetVolume;
ReplicaInfo replicaInfo;
boolean succeeded = false; boolean succeeded = false;
try { try {
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist(); block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (block != null) { if (block != null) {
moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId(), synchronized (FsDatasetImpl.this) {
block.getCreationTime()); replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
// If replicaInfo is null, the block was either deleted before
// it could be checkpointed or it is already on persistent storage.
// This can occur if a second replica on persistent storage was found
// after the lazy write was scheduled.
if (replicaInfo != null &&
replicaInfo.getVolume().isTransientStorage()) {
// Pick a target volume to persist the block.
targetVolume = volumes.getNextVolume(
StorageType.DEFAULT, replicaInfo.getNumBytes());
ramDiskReplicaTracker.recordStartLazyPersist(
block.getBlockPoolId(), block.getBlockId(), targetVolume);
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter: Start persisting RamDisk block:"
+ " block pool Id: " + block.getBlockPoolId()
+ " block id: " + block.getBlockId()
+ " on target volume " + targetVolume);
}
asyncLazyPersistService.submitLazyPersistTask(
block.getBlockPoolId(), block.getBlockId(),
replicaInfo.getGenerationStamp(), block.getCreationTime(),
replicaInfo.getMetaFile(), replicaInfo.getBlockFile(),
targetVolume);
}
}
} }
succeeded = true; succeeded = true;
} catch(IOException ioe) { } catch(IOException ioe) {
@ -2420,10 +2469,9 @@ private boolean saveNextReplica() {
} finally { } finally {
if (!succeeded && block != null) { if (!succeeded && block != null) {
LOG.warn("Failed to save replica " + block + ". re-enqueueing it."); LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block); onFailLazyPersist(block.getBlockPoolId(), block.getBlockId());
} }
} }
return succeeded; return succeeded;
} }
@ -2480,7 +2528,8 @@ private void evictBlocks() throws IOException {
metaFile = replicaInfo.getMetaFile(); metaFile = replicaInfo.getMetaFile();
blockFileUsed = blockFile.length(); blockFileUsed = blockFile.length();
metaFileUsed = metaFile.length(); metaFileUsed = metaFile.length();
discardRamDiskReplica(replicaState, false); ramDiskReplicaTracker.discardReplica(replicaState.getBlockPoolId(),
replicaState.getBlockId(), false);
// Move the replica from lazyPersist/ to finalized/ on target volume // Move the replica from lazyPersist/ to finalized/ on target volume
BlockPoolSlice bpSlice = BlockPoolSlice bpSlice =

View File

@ -124,7 +124,11 @@ File getCurrentDir() {
File getRbwDir(String bpid) throws IOException { File getRbwDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getRbwDir(); return getBlockPoolSlice(bpid).getRbwDir();
} }
File getLazyPersistDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getLazypersistDir();
}
void decDfsUsed(String bpid, long value) { void decDfsUsed(String bpid, long value) {
synchronized(dataset) { synchronized(dataset) {
BlockPoolSlice bp = bpSlices.get(bpid); BlockPoolSlice bp = bpSlices.get(bpid);
@ -134,6 +138,15 @@ void decDfsUsed(String bpid, long value) {
} }
} }
void incDfsUsed(String bpid, long value) {
synchronized(dataset) {
BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) {
bp.incDfsUsed(value);
}
}
}
long getDfsUsed() throws IOException { long getDfsUsed() throws IOException {
long dfsUsed = 0; long dfsUsed = 0;
synchronized(dataset) { synchronized(dataset) {

View File

@ -0,0 +1,252 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* This class is a container of multiple thread pools, one for each non-RamDisk
* volume with a maximum thread count of 1 so that we can schedule async lazy
* persist operations easily with volume arrival and departure handled.
*
* This class and {@link org.apache.hadoop.util.AsyncDiskService} are similar.
* They should be combined.
*/
class RamDiskAsyncLazyPersistService {
public static final Log LOG = LogFactory.getLog(RamDiskAsyncLazyPersistService.class);
// ThreadPool core pool size
private static final int CORE_THREADS_PER_VOLUME = 1;
// ThreadPool maximum pool size
private static final int MAXIMUM_THREADS_PER_VOLUME = 1;
// ThreadPool keep-alive time for threads over core pool size
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
private final DataNode datanode;
private final ThreadGroup threadGroup;
private Map<File, ThreadPoolExecutor> executors
= new HashMap<File, ThreadPoolExecutor>();
/**
* Create a RamDiskAsyncLazyPersistService with a set of volumes (specified by their
* root directories).
*
* The RamDiskAsyncLazyPersistService uses one ThreadPool per volume to do the async
* disk operations.
*/
RamDiskAsyncLazyPersistService(DataNode datanode) {
this.datanode = datanode;
this.threadGroup = new ThreadGroup(getClass().getSimpleName());
}
private void addExecutorForVolume(final File volume) {
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(threadGroup, r);
t.setName("Async RamDisk lazy persist worker for volume " + volume);
return t;
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
// This can reduce the number of running threads
executor.allowCoreThreadTimeOut(true);
executors.put(volume, executor);
}
/**
* Starts AsyncLazyPersistService for a new volume
* @param volume the root of the new data volume.
*/
synchronized void addVolume(File volume) {
if (executors == null) {
throw new RuntimeException("AsyncLazyPersistService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(volume);
if (executor != null) {
throw new RuntimeException("Volume " + volume + " is already existed.");
}
addExecutorForVolume(volume);
}
/**
* Stops AsyncLazyPersistService for a volume.
* @param volume the root of the volume.
*/
synchronized void removeVolume(File volume) {
if (executors == null) {
throw new RuntimeException("AsyncDiskService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(volume);
if (executor == null) {
throw new RuntimeException("Can not find volume " + volume
+ " to remove.");
} else {
executor.shutdown();
executors.remove(volume);
}
}
/**
* Query if the thread pool exist for the volume
* @param volume the root of a volume
* @return true if there is one thread pool for the volume
* false otherwise
*/
synchronized boolean queryVolume(File volume) {
if (executors == null) {
throw new RuntimeException("AsyncLazyPersistService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(volume);
return (executor != null);
}
/**
* Execute the task sometime in the future, using ThreadPools.
*/
synchronized void execute(File root, Runnable task) {
if (executors == null) {
throw new RuntimeException("AsyncLazyPersistService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(root);
if (executor == null) {
throw new RuntimeException("Cannot find root " + root
+ " for execution of task " + task);
} else {
executor.execute(task);
}
}
/**
* Gracefully shut down all ThreadPool. Will wait for all lazy persist
* tasks to finish.
*/
synchronized void shutdown() {
if (executors == null) {
LOG.warn("AsyncLazyPersistService has already shut down.");
} else {
LOG.info("Shutting down all async lazy persist service threads");
for (Map.Entry<File, ThreadPoolExecutor> e : executors.entrySet()) {
e.getValue().shutdown();
}
// clear the executor map so that calling execute again will fail.
executors = null;
LOG.info("All async lazy persist service threads have been shut down");
}
}
/**
* Asynchronously lazy persist the block from the RamDisk to Disk.
*/
void submitLazyPersistTask(String bpId, long blockId,
long genStamp, long creationTime,
File metaFile, File blockFile,
FsVolumeImpl targetVolume) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter schedule async task to persist RamDisk block pool id: "
+ bpId + " block id: " + blockId);
}
File lazyPersistDir = targetVolume.getLazyPersistDir(bpId);
if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) {
FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir);
throw new IOException("LazyWriter fail to find or create lazy persist dir: "
+ lazyPersistDir.toString());
}
ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask(
bpId, blockId, genStamp, creationTime, blockFile, metaFile,
targetVolume, lazyPersistDir);
execute(targetVolume.getCurrentDir(), lazyPersistTask);
}
class ReplicaLazyPersistTask implements Runnable {
final String bpId;
final long blockId;
final long genStamp;
final long creationTime;
final File blockFile;
final File metaFile;
final FsVolumeImpl targetVolume;
final File lazyPersistDir;
ReplicaLazyPersistTask(String bpId, long blockId,
long genStamp, long creationTime,
File blockFile, File metaFile,
FsVolumeImpl targetVolume, File lazyPersistDir) {
this.bpId = bpId;
this.blockId = blockId;
this.genStamp = genStamp;
this.creationTime = creationTime;
this.blockFile = blockFile;
this.metaFile = metaFile;
this.targetVolume = targetVolume;
this.lazyPersistDir = lazyPersistDir;
}
@Override
public String toString() {
// Called in AsyncLazyPersistService.execute for displaying error messages.
return "LazyWriter async task of persist RamDisk block pool id:"
+ bpId + " block pool id: "
+ blockId + " with block file " + blockFile
+ " and meta file " + metaFile + " to target volume " + targetVolume;}
@Override
public void run() {
boolean succeeded = false;
try {
// No FsDatasetImpl lock for the file copy
File targetFiles[] = FsDatasetImpl.copyBlockFiles(
blockId, genStamp, metaFile, blockFile, lazyPersistDir);
// Lock FsDataSetImpl during onCompleteLazyPersist callback
datanode.getFSDataset().onCompleteLazyPersist(bpId, blockId,
creationTime, targetFiles, targetVolume);
succeeded = true;
} catch (Exception e){
FsDatasetImpl.LOG.warn(
"LazyWriter failed to async persist RamDisk block pool id: "
+ bpId + "block Id: " + blockId);
} finally {
if (!succeeded) {
datanode.getFSDataset().onFailLazyPersist(bpId, blockId);
}
}
}
}
}

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -1209,5 +1210,16 @@ public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
FileDescriptor fd, long offset, long nbytes, int flags) { FileDescriptor fd, long offset, long nbytes, int flags) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public void onCompleteLazyPersist(String bpId, long blockId,
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
throw new UnsupportedOperationException();
}
@Override
public void onFailLazyPersist(String bpId, long blockId) {
throw new UnsupportedOperationException();
}
} }

View File

@ -971,7 +971,9 @@ JMXGet initJMX() throws Exception
void printRamDiskJMXMetrics() { void printRamDiskJMXMetrics() {
try { try {
jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN); if (jmx != null) {
jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }