HDFS-7112. LazyWriter should use either async IO or one thread per physical disk. Contributed by Xiaoyu Yao.

This commit is contained in:
cnauroth 2014-10-07 20:25:19 -07:00
parent 60f0f6fcde
commit 1efd9c9825
8 changed files with 413 additions and 85 deletions

View File

@ -357,6 +357,8 @@ Trunk (Unreleased)
HDFS-7171. Fix Jenkins failures in HDFS-6581 branch. (Arpit Agarwal) HDFS-7171. Fix Jenkins failures in HDFS-6581 branch. (Arpit Agarwal)
HDFS-7112. LazyWriter should use either async IO or one thread per physical
disk. (Xiaoyu Yao via cnauroth)
Release 2.7.0 - UNRELEASED Release 2.7.0 - UNRELEASED

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -463,5 +464,15 @@ public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid,
public void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block, public void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block,
final FileDescriptor fd, final long offset, final long nbytes, final FileDescriptor fd, final long offset, final long nbytes,
final int flags); final int flags);
}
/**
* Callback from RamDiskAsyncLazyPersistService upon async lazy persist task end
*/
public void onCompleteLazyPersist(String bpId, long blockId,
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume);
/**
* Callback from RamDiskAsyncLazyPersistService upon async lazy persist task fail
*/
public void onFailLazyPersist(String bpId, long blockId);
}

View File

@ -166,6 +166,10 @@ long getDfsUsed() throws IOException {
return dfsUsage.getUsed(); return dfsUsage.getUsed();
} }
void incDfsUsed(long value) {
dfsUsage.incDfsUsed(value);
}
/** /**
* Read in the cached DU value and return it if it is less than 600 seconds * Read in the cached DU value and return it if it is less than 600 seconds
* old (DU update interval). Slight imprecision of dfsUsed is not critical * old (DU update interval). Slight imprecision of dfsUsed is not critical
@ -270,23 +274,6 @@ File addFinalizedBlock(Block b, File f) throws IOException {
return blockFile; return blockFile;
} }
/**
* Save the given replica to persistent storage.
*
* @return The saved meta and block files, in that order.
* @throws IOException
*/
File[] lazyPersistReplica(long blockId, long genStamp,
File srcMeta, File srcFile) throws IOException {
if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
}
File targetFiles[] = FsDatasetImpl.copyBlockFiles(
blockId, genStamp, srcMeta, srcFile, lazypersistDir);
dfsUsage.incDfsUsed(targetFiles[0].length() + targetFiles[1].length());
return targetFiles;
}
/** /**
* Move a persisted replica from lazypersist directory to a subdirectory * Move a persisted replica from lazypersist directory to a subdirectory
* under finalized. * under finalized.

View File

@ -219,6 +219,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
final ReplicaMap volumeMap; final ReplicaMap volumeMap;
final RamDiskReplicaTracker ramDiskReplicaTracker; final RamDiskReplicaTracker ramDiskReplicaTracker;
final RamDiskAsyncLazyPersistService asyncLazyPersistService;
private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3; private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
@ -272,10 +273,12 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
VolumeChoosingPolicy.class), conf); VolumeChoosingPolicy.class), conf);
volumes = new FsVolumeList(volsFailed, blockChooserImpl); volumes = new FsVolumeList(volsFailed, blockChooserImpl);
asyncDiskService = new FsDatasetAsyncDiskService(datanode); asyncDiskService = new FsDatasetAsyncDiskService(datanode);
asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
addVolume(dataLocations, storage.getStorageDir(idx)); addVolume(dataLocations, storage.getStorageDir(idx));
} }
setupAsyncLazyPersistThreads();
cacheManager = new FsDatasetCache(this); cacheManager = new FsDatasetCache(this);
@ -408,6 +411,8 @@ public void run() {
} }
} }
setupAsyncLazyPersistThreads();
for (int i = 0; i < volumes.size(); i++) { for (int i = 0; i < volumes.size(); i++) {
if (successFlags[i]) { if (successFlags[i]) {
succeedVolumes.add(volumes.get(i)); succeedVolumes.add(volumes.get(i));
@ -461,6 +466,7 @@ public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
storageMap.remove(sd.getStorageUuid()); storageMap.remove(sd.getStorageUuid());
} }
} }
setupAsyncLazyPersistThreads();
} }
private StorageType getStorageTypeFromLocations( private StorageType getStorageTypeFromLocations(
@ -1505,10 +1511,11 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
RamDiskReplica replicaInfo = RamDiskReplica replicaInfo =
ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId()); ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId());
if (replicaInfo != null) { if (replicaInfo != null) {
if (replicaInfo.getIsPersisted() == false) { if (!replicaInfo.getIsPersisted()) {
datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted(); datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted();
} }
discardRamDiskReplica(replicaInfo, true); ramDiskReplicaTracker.discardReplica(replicaInfo.getBlockPoolId(),
replicaInfo.getBlockId(), true);
} }
} }
@ -1750,6 +1757,10 @@ public void shutdown() {
asyncDiskService.shutdown(); asyncDiskService.shutdown();
} }
if (asyncLazyPersistService != null) {
asyncLazyPersistService.shutdown();
}
if(volumes != null) { if(volumes != null) {
volumes.shutdown(); volumes.shutdown();
} }
@ -2307,6 +2318,40 @@ public RollingLogs createRollingLogs(String bpid, String prefix
return new RollingLogsImpl(dir, prefix); return new RollingLogsImpl(dir, prefix);
} }
@Override
public void onCompleteLazyPersist(String bpId, long blockId,
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
synchronized (FsDatasetImpl.this) {
ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
targetVolume.incDfsUsed(bpId,
savedFiles[0].length() + savedFiles[1].length());
// Update metrics (ignore the metadata file size)
datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
datanode.getMetrics().incrRamDiskBytesLazyPersisted(savedFiles[1].length());
datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs(
Time.monotonicNow() - creationTime);
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter: Finish persisting RamDisk block: "
+ " block pool Id: " + bpId + " block id: " + blockId
+ " to block file " + savedFiles[1] + " and meta file " + savedFiles[0]
+ " on target volume " + targetVolume);
}
}
}
@Override
public void onFailLazyPersist(String bpId, long blockId) {
RamDiskReplica block = null;
block = ramDiskReplicaTracker.getReplica(bpId, blockId);
if (block != null) {
LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
}
}
@Override @Override
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
FileDescriptor fd, long offset, long nbytes, int flags) { FileDescriptor fd, long offset, long nbytes, int flags) {
@ -2315,9 +2360,38 @@ public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
nbytes, flags); nbytes, flags);
} }
void discardRamDiskReplica(RamDiskReplica replica, boolean deleteSavedCopies) { private boolean ramDiskConfigured() {
ramDiskReplicaTracker.discardReplica(replica.getBlockPoolId(), for (FsVolumeImpl v: getVolumes()){
replica.getBlockId(), deleteSavedCopies); if (v.isTransientStorage()) {
return true;
}
}
return false;
}
// Add/Remove per DISK volume async lazy persist thread when RamDisk volume is
// added or removed.
// This should only be called when the FsDataSetImpl#volumes list is finalized.
private void setupAsyncLazyPersistThreads() {
boolean ramDiskConfigured = ramDiskConfigured();
for (FsVolumeImpl v: getVolumes()){
// Skip transient volumes
if (v.isTransientStorage()) {
continue;
}
// Add thread for DISK volume if RamDisk is configured
if (ramDiskConfigured &&
!asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
asyncLazyPersistService.addVolume(v.getCurrentDir());
}
// Remove thread for DISK volume if RamDisk is not configured
if (!ramDiskConfigured &&
asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
asyncLazyPersistService.removeVolume(v.getCurrentDir());
}
}
} }
class LazyWriter implements Runnable { class LazyWriter implements Runnable {
@ -2343,61 +2417,6 @@ public LazyWriter(Configuration conf) {
DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT); DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT);
} }
private void moveReplicaToNewVolume(String bpid, long blockId, long creationTime)
throws IOException {
FsVolumeImpl targetVolume;
ReplicaInfo replicaInfo;
BlockPoolSlice bpSlice;
File srcFile, srcMeta;
long genStamp;
synchronized (FsDatasetImpl.this) {
replicaInfo = volumeMap.get(bpid, blockId);
if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) {
// The block was either deleted before it could be checkpointed or
// it is already on persistent storage. This can occur if a second
// replica on persistent storage was found after the lazy write was
// scheduled.
return;
}
// Pick a target volume for the block.
targetVolume = volumes.getNextVolume(
StorageType.DEFAULT, replicaInfo.getNumBytes());
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
}
ramDiskReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
bpSlice = targetVolume.getBlockPoolSlice(bpid);
srcMeta = replicaInfo.getMetaFile();
srcFile = replicaInfo.getBlockFile();
genStamp = replicaInfo.getGenerationStamp();
}
// Drop the FsDatasetImpl lock for the file copy.
File[] savedFiles =
bpSlice.lazyPersistReplica(blockId, genStamp, srcMeta, srcFile);
synchronized (FsDatasetImpl.this) {
ramDiskReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
// Update metrics (ignore the metadata file size)
datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
datanode.getMetrics().incrRamDiskBytesLazyPersisted(replicaInfo.getNumBytes());
datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs(
Time.monotonicNow() - creationTime);
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
" to file " + savedFiles[1]);
}
}
}
/** /**
* Checkpoint a pending replica to persistent storage now. * Checkpoint a pending replica to persistent storage now.
* If we fail then move the replica to the end of the queue. * If we fail then move the replica to the end of the queue.
@ -2405,13 +2424,43 @@ private void moveReplicaToNewVolume(String bpid, long blockId, long creationTime
*/ */
private boolean saveNextReplica() { private boolean saveNextReplica() {
RamDiskReplica block = null; RamDiskReplica block = null;
FsVolumeImpl targetVolume;
ReplicaInfo replicaInfo;
boolean succeeded = false; boolean succeeded = false;
try { try {
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist(); block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (block != null) { if (block != null) {
moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId(), synchronized (FsDatasetImpl.this) {
block.getCreationTime()); replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
// If replicaInfo is null, the block was either deleted before
// it could be checkpointed or it is already on persistent storage.
// This can occur if a second replica on persistent storage was found
// after the lazy write was scheduled.
if (replicaInfo != null &&
replicaInfo.getVolume().isTransientStorage()) {
// Pick a target volume to persist the block.
targetVolume = volumes.getNextVolume(
StorageType.DEFAULT, replicaInfo.getNumBytes());
ramDiskReplicaTracker.recordStartLazyPersist(
block.getBlockPoolId(), block.getBlockId(), targetVolume);
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter: Start persisting RamDisk block:"
+ " block pool Id: " + block.getBlockPoolId()
+ " block id: " + block.getBlockId()
+ " on target volume " + targetVolume);
}
asyncLazyPersistService.submitLazyPersistTask(
block.getBlockPoolId(), block.getBlockId(),
replicaInfo.getGenerationStamp(), block.getCreationTime(),
replicaInfo.getMetaFile(), replicaInfo.getBlockFile(),
targetVolume);
}
}
} }
succeeded = true; succeeded = true;
} catch(IOException ioe) { } catch(IOException ioe) {
@ -2419,10 +2468,9 @@ private boolean saveNextReplica() {
} finally { } finally {
if (!succeeded && block != null) { if (!succeeded && block != null) {
LOG.warn("Failed to save replica " + block + ". re-enqueueing it."); LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block); onFailLazyPersist(block.getBlockPoolId(), block.getBlockId());
} }
} }
return succeeded; return succeeded;
} }
@ -2479,7 +2527,8 @@ private void evictBlocks() throws IOException {
metaFile = replicaInfo.getMetaFile(); metaFile = replicaInfo.getMetaFile();
blockFileUsed = blockFile.length(); blockFileUsed = blockFile.length();
metaFileUsed = metaFile.length(); metaFileUsed = metaFile.length();
discardRamDiskReplica(replicaState, false); ramDiskReplicaTracker.discardReplica(replicaState.getBlockPoolId(),
replicaState.getBlockId(), false);
// Move the replica from lazyPersist/ to finalized/ on target volume // Move the replica from lazyPersist/ to finalized/ on target volume
BlockPoolSlice bpSlice = BlockPoolSlice bpSlice =

View File

@ -125,6 +125,10 @@ File getRbwDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getRbwDir(); return getBlockPoolSlice(bpid).getRbwDir();
} }
File getLazyPersistDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getLazypersistDir();
}
void decDfsUsed(String bpid, long value) { void decDfsUsed(String bpid, long value) {
synchronized(dataset) { synchronized(dataset) {
BlockPoolSlice bp = bpSlices.get(bpid); BlockPoolSlice bp = bpSlices.get(bpid);
@ -134,6 +138,15 @@ void decDfsUsed(String bpid, long value) {
} }
} }
void incDfsUsed(String bpid, long value) {
synchronized(dataset) {
BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) {
bp.incDfsUsed(value);
}
}
}
long getDfsUsed() throws IOException { long getDfsUsed() throws IOException {
long dfsUsed = 0; long dfsUsed = 0;
synchronized(dataset) { synchronized(dataset) {

View File

@ -0,0 +1,252 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* This class is a container of multiple thread pools, one for each non-RamDisk
* volume with a maximum thread count of 1 so that we can schedule async lazy
* persist operations easily with volume arrival and departure handled.
*
* This class and {@link org.apache.hadoop.util.AsyncDiskService} are similar.
* They should be combined.
*/
class RamDiskAsyncLazyPersistService {
public static final Log LOG = LogFactory.getLog(RamDiskAsyncLazyPersistService.class);
// ThreadPool core pool size
private static final int CORE_THREADS_PER_VOLUME = 1;
// ThreadPool maximum pool size
private static final int MAXIMUM_THREADS_PER_VOLUME = 1;
// ThreadPool keep-alive time for threads over core pool size
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
private final DataNode datanode;
private final ThreadGroup threadGroup;
private Map<File, ThreadPoolExecutor> executors
= new HashMap<File, ThreadPoolExecutor>();
/**
* Create a RamDiskAsyncLazyPersistService with a set of volumes (specified by their
* root directories).
*
* The RamDiskAsyncLazyPersistService uses one ThreadPool per volume to do the async
* disk operations.
*/
RamDiskAsyncLazyPersistService(DataNode datanode) {
this.datanode = datanode;
this.threadGroup = new ThreadGroup(getClass().getSimpleName());
}
private void addExecutorForVolume(final File volume) {
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(threadGroup, r);
t.setName("Async RamDisk lazy persist worker for volume " + volume);
return t;
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
// This can reduce the number of running threads
executor.allowCoreThreadTimeOut(true);
executors.put(volume, executor);
}
/**
* Starts AsyncLazyPersistService for a new volume
* @param volume the root of the new data volume.
*/
synchronized void addVolume(File volume) {
if (executors == null) {
throw new RuntimeException("AsyncLazyPersistService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(volume);
if (executor != null) {
throw new RuntimeException("Volume " + volume + " is already existed.");
}
addExecutorForVolume(volume);
}
/**
* Stops AsyncLazyPersistService for a volume.
* @param volume the root of the volume.
*/
synchronized void removeVolume(File volume) {
if (executors == null) {
throw new RuntimeException("AsyncDiskService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(volume);
if (executor == null) {
throw new RuntimeException("Can not find volume " + volume
+ " to remove.");
} else {
executor.shutdown();
executors.remove(volume);
}
}
/**
* Query if the thread pool exist for the volume
* @param volume the root of a volume
* @return true if there is one thread pool for the volume
* false otherwise
*/
synchronized boolean queryVolume(File volume) {
if (executors == null) {
throw new RuntimeException("AsyncLazyPersistService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(volume);
return (executor != null);
}
/**
* Execute the task sometime in the future, using ThreadPools.
*/
synchronized void execute(File root, Runnable task) {
if (executors == null) {
throw new RuntimeException("AsyncLazyPersistService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(root);
if (executor == null) {
throw new RuntimeException("Cannot find root " + root
+ " for execution of task " + task);
} else {
executor.execute(task);
}
}
/**
* Gracefully shut down all ThreadPool. Will wait for all lazy persist
* tasks to finish.
*/
synchronized void shutdown() {
if (executors == null) {
LOG.warn("AsyncLazyPersistService has already shut down.");
} else {
LOG.info("Shutting down all async lazy persist service threads");
for (Map.Entry<File, ThreadPoolExecutor> e : executors.entrySet()) {
e.getValue().shutdown();
}
// clear the executor map so that calling execute again will fail.
executors = null;
LOG.info("All async lazy persist service threads have been shut down");
}
}
/**
* Asynchronously lazy persist the block from the RamDisk to Disk.
*/
void submitLazyPersistTask(String bpId, long blockId,
long genStamp, long creationTime,
File metaFile, File blockFile,
FsVolumeImpl targetVolume) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter schedule async task to persist RamDisk block pool id: "
+ bpId + " block id: " + blockId);
}
File lazyPersistDir = targetVolume.getLazyPersistDir(bpId);
if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) {
FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir);
throw new IOException("LazyWriter fail to find or create lazy persist dir: "
+ lazyPersistDir.toString());
}
ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask(
bpId, blockId, genStamp, creationTime, blockFile, metaFile,
targetVolume, lazyPersistDir);
execute(targetVolume.getCurrentDir(), lazyPersistTask);
}
class ReplicaLazyPersistTask implements Runnable {
final String bpId;
final long blockId;
final long genStamp;
final long creationTime;
final File blockFile;
final File metaFile;
final FsVolumeImpl targetVolume;
final File lazyPersistDir;
ReplicaLazyPersistTask(String bpId, long blockId,
long genStamp, long creationTime,
File blockFile, File metaFile,
FsVolumeImpl targetVolume, File lazyPersistDir) {
this.bpId = bpId;
this.blockId = blockId;
this.genStamp = genStamp;
this.creationTime = creationTime;
this.blockFile = blockFile;
this.metaFile = metaFile;
this.targetVolume = targetVolume;
this.lazyPersistDir = lazyPersistDir;
}
@Override
public String toString() {
// Called in AsyncLazyPersistService.execute for displaying error messages.
return "LazyWriter async task of persist RamDisk block pool id:"
+ bpId + " block pool id: "
+ blockId + " with block file " + blockFile
+ " and meta file " + metaFile + " to target volume " + targetVolume;}
@Override
public void run() {
boolean succeeded = false;
try {
// No FsDatasetImpl lock for the file copy
File targetFiles[] = FsDatasetImpl.copyBlockFiles(
blockId, genStamp, metaFile, blockFile, lazyPersistDir);
// Lock FsDataSetImpl during onCompleteLazyPersist callback
datanode.getFSDataset().onCompleteLazyPersist(bpId, blockId,
creationTime, targetFiles, targetVolume);
succeeded = true;
} catch (Exception e){
FsDatasetImpl.LOG.warn(
"LazyWriter failed to async persist RamDisk block pool id: "
+ bpId + "block Id: " + blockId);
} finally {
if (!succeeded) {
datanode.getFSDataset().onFailLazyPersist(bpId, blockId);
}
}
}
}
}

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -1209,5 +1210,16 @@ public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
FileDescriptor fd, long offset, long nbytes, int flags) { FileDescriptor fd, long offset, long nbytes, int flags) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public void onCompleteLazyPersist(String bpId, long blockId,
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
throw new UnsupportedOperationException();
}
@Override
public void onFailLazyPersist(String bpId, long blockId) {
throw new UnsupportedOperationException();
}
} }

View File

@ -971,7 +971,9 @@ JMXGet initJMX() throws Exception
void printRamDiskJMXMetrics() { void printRamDiskJMXMetrics() {
try { try {
if (jmx != null) {
jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN); jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }