From 34d0088bf9d793de888c8dd969a047dbae77cfb5 Mon Sep 17 00:00:00 2001 From: arp Date: Wed, 27 Aug 2014 15:35:47 -0700 Subject: [PATCH] HDFS-6925. DataNode should attempt to place replicas on transient storage first if lazyPersist flag is received. (Arpit Agarwal) Conflicts: hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java --- .../hdfs/server/datanode/BlockReceiver.java | 5 +- .../hdfs/server/datanode/DataXceiver.java | 6 +- .../server/datanode/DirectoryScanner.java | 2 +- .../hadoop/hdfs/server/datanode/Replica.java | 5 ++ .../hdfs/server/datanode/ReplicaInfo.java | 30 ++-------- .../server/datanode/ReplicaUnderRecovery.java | 3 +- .../AvailableSpaceVolumeChoosingPolicy.java | 2 +- .../datanode/fsdataset/FsDatasetSpi.java | 4 +- .../datanode/fsdataset/FsVolumeSpi.java | 5 +- .../RoundRobinVolumeChoosingPolicy.java | 13 +++- .../fsdataset/impl/FsDatasetImpl.java | 49 +++++++++++---- .../fsdataset/impl/FsTransientVolumeImpl.java | 60 +++++++++++++++++++ .../datanode/fsdataset/impl/FsVolumeImpl.java | 12 +++- .../fsdataset/impl/FsVolumeImplAllocator.java | 44 ++++++++++++++ .../datanode/fsdataset/impl/FsVolumeList.java | 20 ++++++- .../TestWriteBlockGetsBlockLengthHint.java | 5 +- .../server/datanode/BlockReportTestBase.java | 2 +- .../server/datanode/SimulatedFSDataset.java | 10 +++- .../server/datanode/TestBlockRecovery.java | 4 +- .../server/datanode/TestDirectoryScanner.java | 6 +- .../datanode/TestSimulatedFSDataset.java | 2 +- .../fsdataset/impl/FsDatasetTestUtil.java | 2 +- .../fsdataset/impl/TestWriteToReplica.java | 12 ++-- 23 files changed, 231 insertions(+), 72 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsTransientVolumeImpl.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplAllocator.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index bfb22331250..4d1cc6c256c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -139,7 +139,8 @@ class BlockReceiver implements Closeable { final long newGs, final long minBytesRcvd, final long maxBytesRcvd, final String clientname, final DatanodeInfo srcDataNode, final DataNode datanode, DataChecksum requestedChecksum, - CachingStrategy cachingStrategy) throws IOException { + CachingStrategy cachingStrategy, + final boolean allowLazyPersist) throws IOException { try{ this.block = block; this.in = in; @@ -180,7 +181,7 @@ class BlockReceiver implements Closeable { } else { switch (stage) { case PIPELINE_SETUP_CREATE: - replicaInfo = datanode.data.createRbw(storageType, block); + replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist); datanode.notifyNamenodeReceivingBlock( block, replicaInfo.getStorageUuid()); break; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 3b8304e7187..67eb9418115 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -607,8 +607,8 @@ public void writeBlock(final ExtendedBlock block, peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, - cachingStrategy); - + cachingStrategy, allowLazyPersist); + storageUuid = blockReceiver.getStorageUuid(); } else { storageUuid = datanode.data.recoverClose( @@ -1048,7 +1048,7 @@ public void replaceBlock(final ExtendedBlock block, proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), null, 0, 0, 0, "", null, datanode, remoteChecksum, - CachingStrategy.newDropBehind()); + CachingStrategy.newDropBehind(), false); // receive a block blockReceiver.receiveBlock(null, null, replyOut, null, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index a47f2ef4a06..c313b044b20 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -399,7 +399,7 @@ void shutdown() { /** * Reconcile differences between disk and in-memory blocks */ - void reconcile() { + void reconcile() throws IOException { scan(); for (Entry> entry : diffs.entrySet()) { String bpid = entry.getKey(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java index a480bb161f1..b6e5ba91e00 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java @@ -59,4 +59,9 @@ public interface Replica { * Return the storageUuid of the volume that stores this replica. */ public String getStorageUuid(); + + /** + * Return true if the target volume is backed by RAM. + */ + public boolean isOnTransientStorage(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index 49ac605a352..940d3eb5164 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -61,17 +61,6 @@ abstract public class ReplicaInfo extends Block implements Replica { private static final Map internedBaseDirs = new HashMap(); - /** - * Constructor for a zero length replica - * @param blockId block id - * @param genStamp replica generation stamp - * @param vol volume where replica is located - * @param dir directory path where block and meta files are located - */ - ReplicaInfo(long blockId, long genStamp, FsVolumeSpi vol, File dir) { - this( blockId, 0L, genStamp, vol, dir); - } - /** * Constructor * @param block a block @@ -296,20 +285,6 @@ public boolean unlinkBlock(int numLinks) throws IOException { return true; } - /** - * Set this replica's generation stamp to be a newer one - * @param newGS new generation stamp - * @throws IOException is the new generation stamp is not greater than the current one - */ - void setNewerGenerationStamp(long newGS) throws IOException { - long curGS = getGenerationStamp(); - if (newGS <= curGS) { - throw new IOException("New generation stamp (" + newGS - + ") must be greater than current one (" + curGS + ")"); - } - setGenerationStamp(newGS); - } - @Override //Object public String toString() { return getClass().getSimpleName() @@ -321,4 +296,9 @@ public String toString() { + "\n getVolume() = " + getVolume() + "\n getBlockFile() = " + getBlockFile(); } + + @Override + public boolean isOnTransientStorage() { + return volume.isTransientStorage(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java index 35f7c93a9d2..2cd8a01218e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java @@ -37,8 +37,7 @@ public class ReplicaUnderRecovery extends ReplicaInfo { // that the replica will be bumped to after recovery public ReplicaUnderRecovery(ReplicaInfo replica, long recoveryId) { - super(replica.getBlockId(), replica.getNumBytes(), replica.getGenerationStamp(), - replica.getVolume(), replica.getDir()); + super(replica, replica.getVolume(), replica.getDir()); if ( replica.getState() != ReplicaState.FINALIZED && replica.getState() != ReplicaState.RBW && replica.getState() != ReplicaState.RWR ) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java index ec19ec58601..235cd7b9c62 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java @@ -99,7 +99,7 @@ public synchronized Configuration getConf() { @Override public synchronized V chooseVolume(List volumes, - final long replicaSize) throws IOException { + long replicaSize) throws IOException { if (volumes.size() < 1) { throw new DiskOutOfSpaceException("No more available volumes"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 4c03151e3b1..4a5580c1f0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -122,7 +122,7 @@ public StorageReport[] getStorageReports(String bpid) * as corrupted. */ public void checkAndUpdate(String bpid, long blockId, File diskFile, - File diskMetaFile, FsVolumeSpi vol); + File diskMetaFile, FsVolumeSpi vol) throws IOException; /** * @param b - the block @@ -197,7 +197,7 @@ public ReplicaInPipelineInterface createTemporary(StorageType storageType, * @throws IOException if an error occurs */ public ReplicaInPipelineInterface createRbw(StorageType storageType, - ExtendedBlock b) throws IOException; + ExtendedBlock b, boolean allowLazyPersist) throws IOException; /** * Recovers a RBW replica and returns the meta info of the replica diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index cba23c3d4d2..4f459226aea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -56,4 +56,7 @@ public interface FsVolumeSpi { * Release disk space previously reserved for RBW block. */ public void releaseReservedSpace(long bytesToRelease); -} \ No newline at end of file + + /** Returns true if the volume is NOT backed by persistent storage. */ + public boolean isTransientStorage(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java index 7f4bdaeb633..55a356027c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java @@ -20,6 +20,9 @@ import java.io.IOException; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; /** @@ -27,12 +30,14 @@ */ public class RoundRobinVolumeChoosingPolicy implements VolumeChoosingPolicy { + public static final Log LOG = LogFactory.getLog(RoundRobinVolumeChoosingPolicy.class); private int curVolume = 0; @Override - public synchronized V chooseVolume(final List volumes, final long blockSize - ) throws IOException { + public synchronized V chooseVolume(final List volumes, long blockSize) + throws IOException { + if(volumes.size() < 1) { throw new DiskOutOfSpaceException("No more available volumes"); } @@ -50,7 +55,9 @@ public synchronized V chooseVolume(final List volumes, final long blockSize final V volume = volumes.get(curVolume); curVolume = (curVolume + 1) % volumes.size(); long availableVolumeSize = volume.getAvailable(); - if (availableVolumeSize > blockSize) { return volume; } + if (availableVolumeSize > blockSize) { + return volume; + } if (availableVolumeSize > maxAvailable) { maxAvailable = availableVolumeSize; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 57706f818c8..701207d4768 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -45,6 +45,9 @@ import javax.management.StandardMBean; import com.google.common.collect.Lists; +import com.google.common.base.Preconditions; +import com.google.common.collect.TreeMultimap; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -278,7 +281,7 @@ private void addVolume(Collection dataLocations, // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is // nothing needed to be rolled back to make various data structures, e.g., // storageMap and asyncDiskService, consistent. - FsVolumeImpl fsVolume = new FsVolumeImpl( + FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume( this, sd.getStorageUuid(), dir, this.conf, storageType); ReplicaMap tempVolumeMap = new ReplicaMap(this); fsVolume.getVolumeMap(tempVolumeMap); @@ -550,16 +553,16 @@ public long getLength(ExtendedBlock b) throws IOException { * Get File name for a given block. */ private File getBlockFile(ExtendedBlock b) throws IOException { - return getBlockFile(b.getBlockPoolId(), b.getLocalBlock()); + return getBlockFile(b.getBlockPoolId(), b.getBlockId()); } /** * Get File name for a given block. */ - File getBlockFile(String bpid, Block b) throws IOException { - File f = validateBlockFile(bpid, b); + File getBlockFile(String bpid, long blockId) throws IOException { + File f = validateBlockFile(bpid, blockId); if(f == null) { - throw new IOException("Block " + b + " is not valid."); + throw new IOException("BlockId " + blockId + " is not valid."); } return f; } @@ -949,8 +952,8 @@ private void bumpReplicaGS(ReplicaInfo replicaInfo, @Override // FsDatasetSpi public synchronized ReplicaInPipeline createRbw(StorageType storageType, - ExtendedBlock b) throws IOException { - ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), + ExtendedBlock b, boolean allowLazyPersist) throws IOException { + ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { throw new ReplicaAlreadyExistsException("Block " + b + @@ -958,8 +961,25 @@ public synchronized ReplicaInPipeline createRbw(StorageType storageType, " and thus cannot be created."); } // create a new block - FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes()); - // create a rbw file to hold block in the designated volume + FsVolumeImpl v; + while (true) { + try { + if (allowLazyPersist) { + // First try to place the block on a transient volume. + v = volumes.getNextTransientVolume(b.getNumBytes()); + } else { + v = volumes.getNextVolume(storageType, b.getNumBytes()); + } + } catch (DiskOutOfSpaceException de) { + if (allowLazyPersist) { + allowLazyPersist = false; + continue; + } + throw de; + } + break; + } + // create an rbw file to hold block in the designated volume File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); @@ -1321,11 +1341,11 @@ private boolean isValid(final ExtendedBlock b, final ReplicaState state) { /** * Find the file corresponding to the block and return it if it exists. */ - File validateBlockFile(String bpid, Block b) { + File validateBlockFile(String bpid, long blockId) { //Should we check for metadata file too? final File f; synchronized(this) { - f = getFile(bpid, b.getBlockId()); + f = getFile(bpid, blockId); } if(f != null ) { @@ -1337,7 +1357,7 @@ File validateBlockFile(String bpid, Block b) { } if (LOG.isDebugEnabled()) { - LOG.debug("b=" + b + ", f=" + f); + LOG.debug("blockId=" + blockId + ", f=" + f); } return null; } @@ -1497,6 +1517,11 @@ private void cacheBlock(String bpid, long blockId) { ": volume was not an instance of FsVolumeImpl."); return; } + if (volume.isTransientStorage()) { + LOG.warn("Caching not supported on block with id " + blockId + + " since the volume is backed by RAM."); + return; + } success = true; } finally { if (!success) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsTransientVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsTransientVolumeImpl.java new file mode 100644 index 00000000000..dafa74f5615 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsTransientVolumeImpl.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ThreadPoolExecutor; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.StorageType; + +/** + * Volume for storing replicas in memory. These can be deleted at any time + * to make space for new replicas and there is no persistence guarantee. + * + * The backing store for these replicas is expected to be RAM_DISK. + * The backing store may be disk when testing. + * + * It uses the {@link FsDatasetImpl} object for synchronization. + */ +@InterfaceAudience.Private +@VisibleForTesting +public class FsTransientVolumeImpl extends FsVolumeImpl { + + + FsTransientVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir, + Configuration conf, StorageType storageType) + throws IOException { + super(dataset, storageID, currentDir, conf, storageType); + } + + @Override + protected ThreadPoolExecutor initializeCacheExecutor(File parent) { + // Can't 'cache' replicas already in RAM. + return null; + } + + @Override + public boolean isTransientStorage() { + return true; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 40c68404a79..4c0b5f8b8d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -77,7 +77,7 @@ public class FsVolumeImpl implements FsVolumeSpi { * dfs.datanode.fsdatasetcache.max.threads.per.volume) to limit resource * contention. */ - private final ThreadPoolExecutor cacheExecutor; + protected ThreadPoolExecutor cacheExecutor; FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir, Configuration conf, StorageType storageType) throws IOException { @@ -201,6 +201,11 @@ public String getBasePath() { return currentDir.getParent(); } + @Override + public boolean isTransientStorage() { + return false; + } + @Override public String getPath(String bpid) throws IOException { return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath(); @@ -324,7 +329,9 @@ public String toString() { } void shutdown() { - cacheExecutor.shutdown(); + if (cacheExecutor != null) { + cacheExecutor.shutdown(); + } Set> set = bpSlices.entrySet(); for (Entry entry : set) { entry.getValue().shutdown(); @@ -417,6 +424,5 @@ public StorageType getStorageType() { DatanodeStorage toDatanodeStorage() { return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType); } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplAllocator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplAllocator.java new file mode 100644 index 00000000000..14d3aaffef3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplAllocator.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.StorageType; + +/** + * Generate volumes based on the storageType. + */ +@InterfaceAudience.Private +class FsVolumeImplAllocator { + static FsVolumeImpl createVolume(FsDatasetImpl fsDataset, String storageUuid, + File dir, Configuration conf, StorageType storageType) + throws IOException { + switch(storageType) { + case RAM_DISK: + return new FsTransientVolumeImpl( + fsDataset, storageUuid, dir, conf, storageType); + default: + return new FsVolumeImpl( + fsDataset, storageUuid, dir, conf, storageType); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 90739c3f413..9fbc349738e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -68,7 +68,25 @@ synchronized FsVolumeImpl getNextVolume(StorageType storageType, } return blockChooser.chooseVolume(list, blockSize); } - + + /** + * Get next volume. Synchronized to ensure {@link #curVolume} is updated + * by a single thread and next volume is chosen with no concurrent + * update to {@link #volumes}. + * @param blockSize free space needed on the volume + * @return next volume to store the block in. + */ + synchronized FsVolumeImpl getNextTransientVolume( + long blockSize) throws IOException { + final List list = new ArrayList(volumes.size()); + for(FsVolumeImpl v : volumes) { + if (v.isTransientStorage()) { + list.add(v); + } + } + return blockChooser.chooseVolume(list, blockSize); + } + long getDfsUsed() throws IOException { long dfsUsed = 0L; for (FsVolumeImpl v : volumes) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java index 29ac3f2c473..b7fdccfb33e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java @@ -98,9 +98,10 @@ public FsDatasetChecker(DataStorage storage, Configuration conf) { */ @Override public synchronized ReplicaInPipelineInterface createRbw( - StorageType storageType, ExtendedBlock b) throws IOException { + StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) + throws IOException { assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH)); - return super.createRbw(storageType, b); + return super.createRbw(storageType, b, allowLazyPersist); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java index 9ea6c5186a1..e9557da229e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java @@ -364,7 +364,7 @@ public void blockReport_04() throws IOException { // Create a bogus new block which will not be present on the namenode. ExtendedBlock b = new ExtendedBlock( poolId, rand.nextLong(), 1024L, rand.nextLong()); - dn.getFSDataset().createRbw(StorageType.DEFAULT, b); + dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 83d93f0dac4..eab599dcab9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -300,6 +300,11 @@ public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) { public ChunkChecksum getLastChecksumAndDataLen() { return new ChunkChecksum(oStream.getLength(), null); } + + @Override + public boolean isOnTransientStorage() { + return false; + } } /** @@ -747,7 +752,8 @@ public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, @Override // FsDatasetSpi public synchronized ReplicaInPipelineInterface createRbw( - StorageType storageType, ExtendedBlock b) throws IOException { + StorageType storageType, ExtendedBlock b, + boolean allowLazyPersist) throws IOException { return createTemporary(storageType, b); } @@ -1083,7 +1089,7 @@ public void clearRollingUpgradeMarker(String bpid) { @Override public void checkAndUpdate(String bpid, long blockId, File diskFile, - File diskMetaFile, FsVolumeSpi vol) { + File diskMetaFile, FsVolumeSpi vol) throws IOException { throw new UnsupportedOperationException(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index f627c42c619..4dfc7731d77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -529,7 +529,7 @@ public void testNoReplicaUnderRecovery() throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - dn.data.createRbw(StorageType.DEFAULT, block); + dn.data.createRbw(StorageType.DEFAULT, block, false); try { dn.syncBlock(rBlock, initBlockRecords(dn)); fail("Sync should fail"); @@ -553,7 +553,7 @@ public void testNotMatchedReplicaID() throws IOException { LOG.debug("Running " + GenericTestUtils.getMethodName()); } ReplicaInPipelineInterface replicaInfo = dn.data.createRbw( - StorageType.DEFAULT, block); + StorageType.DEFAULT, block, false); ReplicaOutputStreams streams = null; try { streams = replicaInfo.createStreams(true, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index bc50eaac3c3..78b9e2b014d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -215,7 +215,7 @@ private long createBlockMetaFile() throws IOException { } private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile, - long missingMemoryBlocks, long mismatchBlocks) { + long missingMemoryBlocks, long mismatchBlocks) throws IOException { scanner.reconcile(); assertTrue(scanner.diffs.containsKey(bpid)); @@ -431,6 +431,10 @@ public void reserveSpaceForRbw(long bytesToReserve) { @Override public void releaseReservedSpace(long bytesToRelease) { + + @Override + public boolean isTransientStorage() { + return false; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index bd6c3de2266..099a0cdae1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -67,7 +67,7 @@ int addSomeBlocks(SimulatedFSDataset fsdataset, int startingBlockId) // we pass expected len as zero, - fsdataset should use the sizeof actual // data written ReplicaInPipelineInterface bInfo = fsdataset.createRbw( - StorageType.DEFAULT, b); + StorageType.DEFAULT, b, false); ReplicaOutputStreams out = bInfo.createStreams(true, DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java index 6bd36edd52a..d6d7dd7399a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java @@ -34,7 +34,7 @@ public static File getFile(FsDatasetSpi fsd, String bpid, long bid) { public static File getBlockFile(FsDatasetSpi fsd, String bpid, Block b ) throws IOException { - return ((FsDatasetImpl)fsd).getBlockFile(bpid, b); + return ((FsDatasetImpl)fsd).getBlockFile(bpid, b.getBlockId()); } public static File getMetaFile(FsDatasetSpi fsd, String bpid, Block b) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index a870aa90da0..60c6d0304f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -358,7 +358,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw } try { - dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED]); + dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED], false); Assert.fail("Should not have created a replica that's already " + "finalized " + blocks[FINALIZED]); } catch (ReplicaAlreadyExistsException e) { @@ -376,7 +376,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw } try { - dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY]); + dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY], false); Assert.fail("Should not have created a replica that had created as " + "temporary " + blocks[TEMPORARY]); } catch (ReplicaAlreadyExistsException e) { @@ -386,7 +386,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw 0L, blocks[RBW].getNumBytes()); // expect to be successful try { - dataSet.createRbw(StorageType.DEFAULT, blocks[RBW]); + dataSet.createRbw(StorageType.DEFAULT, blocks[RBW], false); Assert.fail("Should not have created a replica that had created as RBW " + blocks[RBW]); } catch (ReplicaAlreadyExistsException e) { @@ -402,7 +402,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw } try { - dataSet.createRbw(StorageType.DEFAULT, blocks[RWR]); + dataSet.createRbw(StorageType.DEFAULT, blocks[RWR], false); Assert.fail("Should not have created a replica that was waiting to be " + "recovered " + blocks[RWR]); } catch (ReplicaAlreadyExistsException e) { @@ -418,7 +418,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw } try { - dataSet.createRbw(StorageType.DEFAULT, blocks[RUR]); + dataSet.createRbw(StorageType.DEFAULT, blocks[RUR], false); Assert.fail("Should not have created a replica that was under recovery " + blocks[RUR]); } catch (ReplicaAlreadyExistsException e) { @@ -435,7 +435,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA)); } - dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT]); + dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT], false); } private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {