diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index b5699f40bb0..6da5b2b73d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -357,6 +357,8 @@ Trunk (Unreleased) HDFS-7171. Fix Jenkins failures in HDFS-6581 branch. (Arpit Agarwal) + HDFS-7112. LazyWriter should use either async IO or one thread per physical + disk. (Xiaoyu Yao via cnauroth) Release 2.7.0 - UNRELEASED diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 2bb2e7f0210..3f1400dfaa0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -463,5 +464,15 @@ public interface FsDatasetSpi extends FSDatasetMBean { public void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block, final FileDescriptor fd, final long offset, final long nbytes, final int flags); -} + /** + * Callback from RamDiskAsyncLazyPersistService upon async lazy persist task end + */ + public void onCompleteLazyPersist(String bpId, long blockId, + long creationTime, File[] savedFiles, FsVolumeImpl targetVolume); + + /** + * Callback from RamDiskAsyncLazyPersistService upon async lazy persist task fail + */ + public void onFailLazyPersist(String bpId, long blockId); +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 3eeb3eff705..384a80ac3f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -165,6 +165,10 @@ class BlockPoolSlice { long getDfsUsed() throws IOException { return dfsUsage.getUsed(); } + + void incDfsUsed(long value) { + dfsUsage.incDfsUsed(value); + } /** * Read in the cached DU value and return it if it is less than 600 seconds @@ -270,23 +274,6 @@ class BlockPoolSlice { return blockFile; } - /** - * Save the given replica to persistent storage. - * - * @return The saved meta and block files, in that order. - * @throws IOException - */ - File[] lazyPersistReplica(long blockId, long genStamp, - File srcMeta, File srcFile) throws IOException { - if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) { - FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir); - } - File targetFiles[] = FsDatasetImpl.copyBlockFiles( - blockId, genStamp, srcMeta, srcFile, lazypersistDir); - dfsUsage.incDfsUsed(targetFiles[0].length() + targetFiles[1].length()); - return targetFiles; - } - /** * Move a persisted replica from lazypersist directory to a subdirectory * under finalized. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index df52e14a7c2..4ac77ed46a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -219,6 +219,7 @@ class FsDatasetImpl implements FsDatasetSpi { final ReplicaMap volumeMap; final RamDiskReplicaTracker ramDiskReplicaTracker; + final RamDiskAsyncLazyPersistService asyncLazyPersistService; private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3; @@ -272,10 +273,12 @@ class FsDatasetImpl implements FsDatasetSpi { VolumeChoosingPolicy.class), conf); volumes = new FsVolumeList(volsFailed, blockChooserImpl); asyncDiskService = new FsDatasetAsyncDiskService(datanode); + asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode); for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { addVolume(dataLocations, storage.getStorageDir(idx)); } + setupAsyncLazyPersistThreads(); cacheManager = new FsDatasetCache(this); @@ -408,6 +411,8 @@ class FsDatasetImpl implements FsDatasetSpi { } } + setupAsyncLazyPersistThreads(); + for (int i = 0; i < volumes.size(); i++) { if (successFlags[i]) { succeedVolumes.add(volumes.get(i)); @@ -461,6 +466,7 @@ class FsDatasetImpl implements FsDatasetSpi { storageMap.remove(sd.getStorageUuid()); } } + setupAsyncLazyPersistThreads(); } private StorageType getStorageTypeFromLocations( @@ -1505,10 +1511,11 @@ class FsDatasetImpl implements FsDatasetSpi { RamDiskReplica replicaInfo = ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId()); if (replicaInfo != null) { - if (replicaInfo.getIsPersisted() == false) { + if (!replicaInfo.getIsPersisted()) { datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted(); } - discardRamDiskReplica(replicaInfo, true); + ramDiskReplicaTracker.discardReplica(replicaInfo.getBlockPoolId(), + replicaInfo.getBlockId(), true); } } @@ -1749,6 +1756,10 @@ class FsDatasetImpl implements FsDatasetSpi { if (asyncDiskService != null) { asyncDiskService.shutdown(); } + + if (asyncLazyPersistService != null) { + asyncLazyPersistService.shutdown(); + } if(volumes != null) { volumes.shutdown(); @@ -2307,6 +2318,40 @@ class FsDatasetImpl implements FsDatasetSpi { return new RollingLogsImpl(dir, prefix); } + @Override + public void onCompleteLazyPersist(String bpId, long blockId, + long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) { + synchronized (FsDatasetImpl.this) { + ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles); + + targetVolume.incDfsUsed(bpId, + savedFiles[0].length() + savedFiles[1].length()); + + // Update metrics (ignore the metadata file size) + datanode.getMetrics().incrRamDiskBlocksLazyPersisted(); + datanode.getMetrics().incrRamDiskBytesLazyPersisted(savedFiles[1].length()); + datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs( + Time.monotonicNow() - creationTime); + + if (LOG.isDebugEnabled()) { + LOG.debug("LazyWriter: Finish persisting RamDisk block: " + + " block pool Id: " + bpId + " block id: " + blockId + + " to block file " + savedFiles[1] + " and meta file " + savedFiles[0] + + " on target volume " + targetVolume); + } + } + } + + @Override + public void onFailLazyPersist(String bpId, long blockId) { + RamDiskReplica block = null; + block = ramDiskReplicaTracker.getReplica(bpId, blockId); + if (block != null) { + LOG.warn("Failed to save replica " + block + ". re-enqueueing it."); + ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block); + } + } + @Override public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, FileDescriptor fd, long offset, long nbytes, int flags) { @@ -2315,9 +2360,38 @@ class FsDatasetImpl implements FsDatasetSpi { nbytes, flags); } - void discardRamDiskReplica(RamDiskReplica replica, boolean deleteSavedCopies) { - ramDiskReplicaTracker.discardReplica(replica.getBlockPoolId(), - replica.getBlockId(), deleteSavedCopies); + private boolean ramDiskConfigured() { + for (FsVolumeImpl v: getVolumes()){ + if (v.isTransientStorage()) { + return true; + } + } + return false; + } + + // Add/Remove per DISK volume async lazy persist thread when RamDisk volume is + // added or removed. + // This should only be called when the FsDataSetImpl#volumes list is finalized. + private void setupAsyncLazyPersistThreads() { + boolean ramDiskConfigured = ramDiskConfigured(); + for (FsVolumeImpl v: getVolumes()){ + // Skip transient volumes + if (v.isTransientStorage()) { + continue; + } + + // Add thread for DISK volume if RamDisk is configured + if (ramDiskConfigured && + !asyncLazyPersistService.queryVolume(v.getCurrentDir())) { + asyncLazyPersistService.addVolume(v.getCurrentDir()); + } + + // Remove thread for DISK volume if RamDisk is not configured + if (!ramDiskConfigured && + asyncLazyPersistService.queryVolume(v.getCurrentDir())) { + asyncLazyPersistService.removeVolume(v.getCurrentDir()); + } + } } class LazyWriter implements Runnable { @@ -2343,61 +2417,6 @@ class FsDatasetImpl implements FsDatasetSpi { DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT); } - private void moveReplicaToNewVolume(String bpid, long blockId, long creationTime) - throws IOException { - - FsVolumeImpl targetVolume; - ReplicaInfo replicaInfo; - BlockPoolSlice bpSlice; - File srcFile, srcMeta; - long genStamp; - - synchronized (FsDatasetImpl.this) { - replicaInfo = volumeMap.get(bpid, blockId); - - if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) { - // The block was either deleted before it could be checkpointed or - // it is already on persistent storage. This can occur if a second - // replica on persistent storage was found after the lazy write was - // scheduled. - return; - } - - // Pick a target volume for the block. - targetVolume = volumes.getNextVolume( - StorageType.DEFAULT, replicaInfo.getNumBytes()); - - if (LOG.isDebugEnabled()) { - LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid); - } - - ramDiskReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume); - bpSlice = targetVolume.getBlockPoolSlice(bpid); - srcMeta = replicaInfo.getMetaFile(); - srcFile = replicaInfo.getBlockFile(); - genStamp = replicaInfo.getGenerationStamp(); - } - - // Drop the FsDatasetImpl lock for the file copy. - File[] savedFiles = - bpSlice.lazyPersistReplica(blockId, genStamp, srcMeta, srcFile); - - synchronized (FsDatasetImpl.this) { - ramDiskReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles); - - // Update metrics (ignore the metadata file size) - datanode.getMetrics().incrRamDiskBlocksLazyPersisted(); - datanode.getMetrics().incrRamDiskBytesLazyPersisted(replicaInfo.getNumBytes()); - datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs( - Time.monotonicNow() - creationTime); - - if (LOG.isDebugEnabled()) { - LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid + - " to file " + savedFiles[1]); - } - } - } - /** * Checkpoint a pending replica to persistent storage now. * If we fail then move the replica to the end of the queue. @@ -2405,13 +2424,43 @@ class FsDatasetImpl implements FsDatasetSpi { */ private boolean saveNextReplica() { RamDiskReplica block = null; + FsVolumeImpl targetVolume; + ReplicaInfo replicaInfo; boolean succeeded = false; try { block = ramDiskReplicaTracker.dequeueNextReplicaToPersist(); if (block != null) { - moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId(), - block.getCreationTime()); + synchronized (FsDatasetImpl.this) { + replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); + + // If replicaInfo is null, the block was either deleted before + // it could be checkpointed or it is already on persistent storage. + // This can occur if a second replica on persistent storage was found + // after the lazy write was scheduled. + if (replicaInfo != null && + replicaInfo.getVolume().isTransientStorage()) { + // Pick a target volume to persist the block. + targetVolume = volumes.getNextVolume( + StorageType.DEFAULT, replicaInfo.getNumBytes()); + + ramDiskReplicaTracker.recordStartLazyPersist( + block.getBlockPoolId(), block.getBlockId(), targetVolume); + + if (LOG.isDebugEnabled()) { + LOG.debug("LazyWriter: Start persisting RamDisk block:" + + " block pool Id: " + block.getBlockPoolId() + + " block id: " + block.getBlockId() + + " on target volume " + targetVolume); + } + + asyncLazyPersistService.submitLazyPersistTask( + block.getBlockPoolId(), block.getBlockId(), + replicaInfo.getGenerationStamp(), block.getCreationTime(), + replicaInfo.getMetaFile(), replicaInfo.getBlockFile(), + targetVolume); + } + } } succeeded = true; } catch(IOException ioe) { @@ -2419,10 +2468,9 @@ class FsDatasetImpl implements FsDatasetSpi { } finally { if (!succeeded && block != null) { LOG.warn("Failed to save replica " + block + ". re-enqueueing it."); - ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block); + onFailLazyPersist(block.getBlockPoolId(), block.getBlockId()); } } - return succeeded; } @@ -2479,7 +2527,8 @@ class FsDatasetImpl implements FsDatasetSpi { metaFile = replicaInfo.getMetaFile(); blockFileUsed = blockFile.length(); metaFileUsed = metaFile.length(); - discardRamDiskReplica(replicaState, false); + ramDiskReplicaTracker.discardReplica(replicaState.getBlockPoolId(), + replicaState.getBlockId(), false); // Move the replica from lazyPersist/ to finalized/ on target volume BlockPoolSlice bpSlice = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 60fb71d63b0..1d7540ccb1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -124,7 +124,11 @@ public class FsVolumeImpl implements FsVolumeSpi { File getRbwDir(String bpid) throws IOException { return getBlockPoolSlice(bpid).getRbwDir(); } - + + File getLazyPersistDir(String bpid) throws IOException { + return getBlockPoolSlice(bpid).getLazypersistDir(); + } + void decDfsUsed(String bpid, long value) { synchronized(dataset) { BlockPoolSlice bp = bpSlices.get(bpid); @@ -134,6 +138,15 @@ public class FsVolumeImpl implements FsVolumeSpi { } } + void incDfsUsed(String bpid, long value) { + synchronized(dataset) { + BlockPoolSlice bp = bpSlices.get(bpid); + if (bp != null) { + bp.incDfsUsed(value); + } + } + } + long getDfsUsed() throws IOException { long dfsUsed = 0; synchronized(dataset) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java new file mode 100644 index 00000000000..76acbea42b6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.server.datanode.DataNode; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * This class is a container of multiple thread pools, one for each non-RamDisk + * volume with a maximum thread count of 1 so that we can schedule async lazy + * persist operations easily with volume arrival and departure handled. + * + * This class and {@link org.apache.hadoop.util.AsyncDiskService} are similar. + * They should be combined. + */ +class RamDiskAsyncLazyPersistService { + public static final Log LOG = LogFactory.getLog(RamDiskAsyncLazyPersistService.class); + + // ThreadPool core pool size + private static final int CORE_THREADS_PER_VOLUME = 1; + // ThreadPool maximum pool size + private static final int MAXIMUM_THREADS_PER_VOLUME = 1; + // ThreadPool keep-alive time for threads over core pool size + private static final long THREADS_KEEP_ALIVE_SECONDS = 60; + + private final DataNode datanode; + private final ThreadGroup threadGroup; + private Map executors + = new HashMap(); + + /** + * Create a RamDiskAsyncLazyPersistService with a set of volumes (specified by their + * root directories). + * + * The RamDiskAsyncLazyPersistService uses one ThreadPool per volume to do the async + * disk operations. + */ + RamDiskAsyncLazyPersistService(DataNode datanode) { + this.datanode = datanode; + this.threadGroup = new ThreadGroup(getClass().getSimpleName()); + } + + private void addExecutorForVolume(final File volume) { + ThreadFactory threadFactory = new ThreadFactory() { + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(threadGroup, r); + t.setName("Async RamDisk lazy persist worker for volume " + volume); + return t; + } + }; + + ThreadPoolExecutor executor = new ThreadPoolExecutor( + CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, + THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, + new LinkedBlockingQueue(), threadFactory); + + // This can reduce the number of running threads + executor.allowCoreThreadTimeOut(true); + executors.put(volume, executor); + } + + /** + * Starts AsyncLazyPersistService for a new volume + * @param volume the root of the new data volume. + */ + synchronized void addVolume(File volume) { + if (executors == null) { + throw new RuntimeException("AsyncLazyPersistService is already shutdown"); + } + ThreadPoolExecutor executor = executors.get(volume); + if (executor != null) { + throw new RuntimeException("Volume " + volume + " is already existed."); + } + addExecutorForVolume(volume); + } + + /** + * Stops AsyncLazyPersistService for a volume. + * @param volume the root of the volume. + */ + synchronized void removeVolume(File volume) { + if (executors == null) { + throw new RuntimeException("AsyncDiskService is already shutdown"); + } + ThreadPoolExecutor executor = executors.get(volume); + if (executor == null) { + throw new RuntimeException("Can not find volume " + volume + + " to remove."); + } else { + executor.shutdown(); + executors.remove(volume); + } + } + + /** + * Query if the thread pool exist for the volume + * @param volume the root of a volume + * @return true if there is one thread pool for the volume + * false otherwise + */ + synchronized boolean queryVolume(File volume) { + if (executors == null) { + throw new RuntimeException("AsyncLazyPersistService is already shutdown"); + } + ThreadPoolExecutor executor = executors.get(volume); + return (executor != null); + } + + /** + * Execute the task sometime in the future, using ThreadPools. + */ + synchronized void execute(File root, Runnable task) { + if (executors == null) { + throw new RuntimeException("AsyncLazyPersistService is already shutdown"); + } + ThreadPoolExecutor executor = executors.get(root); + if (executor == null) { + throw new RuntimeException("Cannot find root " + root + + " for execution of task " + task); + } else { + executor.execute(task); + } + } + + /** + * Gracefully shut down all ThreadPool. Will wait for all lazy persist + * tasks to finish. + */ + synchronized void shutdown() { + if (executors == null) { + LOG.warn("AsyncLazyPersistService has already shut down."); + } else { + LOG.info("Shutting down all async lazy persist service threads"); + + for (Map.Entry e : executors.entrySet()) { + e.getValue().shutdown(); + } + // clear the executor map so that calling execute again will fail. + executors = null; + LOG.info("All async lazy persist service threads have been shut down"); + } + } + + /** + * Asynchronously lazy persist the block from the RamDisk to Disk. + */ + void submitLazyPersistTask(String bpId, long blockId, + long genStamp, long creationTime, + File metaFile, File blockFile, + FsVolumeImpl targetVolume) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("LazyWriter schedule async task to persist RamDisk block pool id: " + + bpId + " block id: " + blockId); + } + + File lazyPersistDir = targetVolume.getLazyPersistDir(bpId); + if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) { + FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir); + throw new IOException("LazyWriter fail to find or create lazy persist dir: " + + lazyPersistDir.toString()); + } + + ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask( + bpId, blockId, genStamp, creationTime, blockFile, metaFile, + targetVolume, lazyPersistDir); + execute(targetVolume.getCurrentDir(), lazyPersistTask); + } + + class ReplicaLazyPersistTask implements Runnable { + final String bpId; + final long blockId; + final long genStamp; + final long creationTime; + final File blockFile; + final File metaFile; + final FsVolumeImpl targetVolume; + final File lazyPersistDir; + + ReplicaLazyPersistTask(String bpId, long blockId, + long genStamp, long creationTime, + File blockFile, File metaFile, + FsVolumeImpl targetVolume, File lazyPersistDir) { + this.bpId = bpId; + this.blockId = blockId; + this.genStamp = genStamp; + this.creationTime = creationTime; + this.blockFile = blockFile; + this.metaFile = metaFile; + this.targetVolume = targetVolume; + this.lazyPersistDir = lazyPersistDir; + } + + @Override + public String toString() { + // Called in AsyncLazyPersistService.execute for displaying error messages. + return "LazyWriter async task of persist RamDisk block pool id:" + + bpId + " block pool id: " + + blockId + " with block file " + blockFile + + " and meta file " + metaFile + " to target volume " + targetVolume;} + + @Override + public void run() { + boolean succeeded = false; + try { + // No FsDatasetImpl lock for the file copy + File targetFiles[] = FsDatasetImpl.copyBlockFiles( + blockId, genStamp, metaFile, blockFile, lazyPersistDir); + + // Lock FsDataSetImpl during onCompleteLazyPersist callback + datanode.getFSDataset().onCompleteLazyPersist(bpId, blockId, + creationTime, targetFiles, targetVolume); + succeeded = true; + } catch (Exception e){ + FsDatasetImpl.LOG.warn( + "LazyWriter failed to async persist RamDisk block pool id: " + + bpId + "block Id: " + blockId); + } finally { + if (!succeeded) { + datanode.getFSDataset().onFailLazyPersist(bpId, blockId); + } + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index d1284fe03a3..0786bc69e58 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -1209,5 +1210,16 @@ public class SimulatedFSDataset implements FsDatasetSpi { FileDescriptor fd, long offset, long nbytes, int flags) { throw new UnsupportedOperationException(); } + + @Override + public void onCompleteLazyPersist(String bpId, long blockId, + long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) { + throw new UnsupportedOperationException(); + } + + @Override + public void onFailLazyPersist(String bpId, long blockId) { + throw new UnsupportedOperationException(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index 91deb55ee5a..9f1d50a1e32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -971,7 +971,9 @@ public class TestLazyPersistFiles { void printRamDiskJMXMetrics() { try { - jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN); + if (jmx != null) { + jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN); + } } catch (Exception e) { e.printStackTrace(); }