From d3292379906bdebc6834076e56eaa21dbc409d88 Mon Sep 17 00:00:00 2001 From: Colin Patrick Mccabe Date: Thu, 18 Sep 2014 16:46:01 -0700 Subject: [PATCH] HDFS-6727. Refresh data volumes on DataNode based on configuration changes (Lei Xu via Colin Patrick McCabe) (cherry picked from commit fe38d2e9b5ac7e13f97cd2d3d2a984ab6bbaaf77) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/common/HdfsServerConstants.java | 7 +- .../hadoop/hdfs/server/common/Storage.java | 10 +- .../hadoop/hdfs/server/datanode/DataNode.java | 161 ++++++- .../datanode/fsdataset/FsDatasetSpi.java | 4 +- .../fsdataset/impl/FsDatasetImpl.java | 141 ++++-- .../server/datanode/SimulatedFSDataset.java | 3 +- .../datanode/TestDataNodeHotSwapVolumes.java | 423 ++++++++++++++++++ .../fsdataset/impl/TestFsDatasetImpl.java | 12 +- 9 files changed, 725 insertions(+), 39 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4345daef759..f5ad06c322d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -222,6 +222,9 @@ Release 2.6.0 - UNRELEASED HDFS-7003. Add NFS Gateway support for reading and writing to encryption zones. (clamb via wang) + HDFS-6727. Refresh data volumes on DataNode based on configuration changes + (Lei Xu via cmccabe) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java index 106f4893d60..767c1b559f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java @@ -94,7 +94,12 @@ public final class HdfsServerConstants { NONINTERACTIVE("-nonInteractive"), RENAMERESERVED("-renameReserved"), METADATAVERSION("-metadataVersion"), - UPGRADEONLY("-upgradeOnly"); + UPGRADEONLY("-upgradeOnly"), + // The -hotswap constant should not be used as a startup option, it is + // only used for StorageDirectory.analyzeStorage() in hot swap drive scenario. + // TODO refactor StorageDirectory.analyzeStorage() so that we can do away with + // this in StartupOption. + HOTSWAP("-hotswap"); private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = Pattern.compile( "(\\w+)\\((\\w+)\\)"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java index ed717348879..73ab8372a34 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java @@ -464,17 +464,20 @@ public abstract class Storage extends StorageInfo { public StorageState analyzeStorage(StartupOption startOpt, Storage storage) throws IOException { assert root != null : "root is null"; + boolean hadMkdirs = false; String rootPath = root.getCanonicalPath(); try { // check that storage exists if (!root.exists()) { // storage directory does not exist - if (startOpt != StartupOption.FORMAT) { + if (startOpt != StartupOption.FORMAT && + startOpt != StartupOption.HOTSWAP) { LOG.warn("Storage directory " + rootPath + " does not exist"); return StorageState.NON_EXISTENT; } LOG.info(rootPath + " does not exist. Creating ..."); if (!root.mkdirs()) throw new IOException("Cannot create directory " + rootPath); + hadMkdirs = true; } // or is inaccessible if (!root.isDirectory()) { @@ -492,7 +495,10 @@ public abstract class Storage extends StorageInfo { this.lock(); // lock storage if it exists - if (startOpt == HdfsServerConstants.StartupOption.FORMAT) + // If startOpt is HOTSWAP, it returns NOT_FORMATTED for empty directory, + // while it also checks the layout version. + if (startOpt == HdfsServerConstants.StartupOption.FORMAT || + (startOpt == StartupOption.HOTSWAP && hadMkdirs)) return StorageState.NOT_FORMATTED; if (startOpt != HdfsServerConstants.StartupOption.IMPORT) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index b7e07e8c631..63363577590 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -70,8 +70,10 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -80,11 +82,13 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.management.ObjectName; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.conf.ReconfigurableBase; +import org.apache.hadoop.conf.ReconfigurationException; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; @@ -138,6 +142,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.JspHelper; +import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -223,7 +228,7 @@ import com.google.protobuf.BlockingService; * **********************************************************/ @InterfaceAudience.Private -public class DataNode extends Configured +public class DataNode extends ReconfigurableBase implements InterDatanodeProtocol, ClientDatanodeProtocol, DataNodeMXBean { public static final Log LOG = LogFactory.getLog(DataNode.class); @@ -308,6 +313,7 @@ public class DataNode extends Configured private JvmPauseMonitor pauseMonitor; private SecureResources secureResources = null; + // dataDirs must be accessed while holding the DataNode lock. private List dataDirs; private Configuration conf; private final String confVersion; @@ -389,6 +395,149 @@ public class DataNode extends Configured } } + @Override + public void reconfigurePropertyImpl(String property, String newVal) + throws ReconfigurationException { + if (property.equals(DFS_DATANODE_DATA_DIR_KEY)) { + try { + LOG.info("Reconfiguring " + property + " to " + newVal); + this.refreshVolumes(newVal); + } catch (Exception e) { + throw new ReconfigurationException(property, newVal, + getConf().get(property), e); + } + } else { + throw new ReconfigurationException( + property, newVal, getConf().get(property)); + } + } + + /** + * Get a list of the keys of the re-configurable properties in configuration. + */ + @Override + public Collection getReconfigurableProperties() { + List reconfigurable = + Collections.unmodifiableList(Arrays.asList(DFS_DATANODE_DATA_DIR_KEY)); + return reconfigurable; + } + + /** + * Contains the StorageLocations for changed data volumes. + */ + @VisibleForTesting + static class ChangedVolumes { + List newLocations = Lists.newArrayList(); + List deactivateLocations = Lists.newArrayList(); + } + + /** + * Parse the new DFS_DATANODE_DATA_DIR value in the configuration to detect + * changed volumes. + * @return changed volumes. + * @throws IOException if none of the directories are specified in the + * configuration. + */ + @VisibleForTesting + ChangedVolumes parseChangedVolumes() throws IOException { + List locations = getStorageLocations(getConf()); + + if (locations.isEmpty()) { + throw new IOException("No directory is specified."); + } + + ChangedVolumes results = new ChangedVolumes(); + results.newLocations.addAll(locations); + + for (Iterator it = storage.dirIterator(); + it.hasNext(); ) { + Storage.StorageDirectory dir = it.next(); + boolean found = false; + for (Iterator sl = results.newLocations.iterator(); + sl.hasNext(); ) { + if (sl.next().getFile().getCanonicalPath().equals( + dir.getRoot().getCanonicalPath())) { + sl.remove(); + found = true; + break; + } + } + + if (!found) { + results.deactivateLocations.add( + StorageLocation.parse(dir.getRoot().toString())); + } + } + + return results; + } + + /** + * Attempts to reload data volumes with new configuration. + * @param newVolumes a comma separated string that specifies the data volumes. + * @throws Exception + */ + private synchronized void refreshVolumes(String newVolumes) throws Exception { + Configuration conf = getConf(); + String oldVolumes = conf.get(DFS_DATANODE_DATA_DIR_KEY); + conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes); + List locations = getStorageLocations(conf); + + final int numOldDataDirs = dataDirs.size(); + dataDirs = locations; + ChangedVolumes changedVolumes = parseChangedVolumes(); + + try { + if (numOldDataDirs + changedVolumes.newLocations.size() - + changedVolumes.deactivateLocations.size() <= 0) { + throw new IOException("Attempt to remove all volumes."); + } + if (!changedVolumes.newLocations.isEmpty()) { + LOG.info("Adding new volumes: " + + Joiner.on(",").join(changedVolumes.newLocations)); + + // Add volumes for each Namespace + for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { + NamespaceInfo nsInfo = bpos.getNamespaceInfo(); + LOG.info("Loading volumes for namesapce: " + nsInfo.getNamespaceID()); + storage.addStorageLocations( + this, nsInfo, changedVolumes.newLocations, StartupOption.HOTSWAP); + } + List bpids = Lists.newArrayList(); + for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { + bpids.add(bpos.getBlockPoolId()); + } + List succeedVolumes = + data.addVolumes(changedVolumes.newLocations, bpids); + + if (succeedVolumes.size() < changedVolumes.newLocations.size()) { + List failedVolumes = Lists.newArrayList(); + // Clean all failed volumes. + for (StorageLocation location : changedVolumes.newLocations) { + if (!succeedVolumes.contains(location)) { + failedVolumes.add(location); + } + } + storage.removeVolumes(failedVolumes); + data.removeVolumes(failedVolumes); + } + } + + if (!changedVolumes.deactivateLocations.isEmpty()) { + LOG.info("Deactivating volumes: " + + Joiner.on(",").join(changedVolumes.deactivateLocations)); + + data.removeVolumes(changedVolumes.deactivateLocations); + storage.removeVolumes(changedVolumes.deactivateLocations); + } + } catch (IOException e) { + LOG.warn("There is IOException when refreshing volumes! " + + "Recover configurations: " + DFS_DATANODE_DATA_DIR_KEY + + " = " + oldVolumes, e); + throw e; + } + } + private synchronized void setClusterId(final String nsCid, final String bpid ) throws IOException { if(clusterId != null && !clusterId.equals(nsCid)) { @@ -829,7 +978,9 @@ public class DataNode extends Configured // settings global for all BPs in the Data Node this.secureResources = resources; - this.dataDirs = dataDirs; + synchronized (this) { + this.dataDirs = dataDirs; + } this.conf = conf; this.dnConf = new DNConf(conf); this.spanReceiverHost = SpanReceiverHost.getInstance(conf); @@ -1119,7 +1270,9 @@ public class DataNode extends Configured } final String bpid = nsInfo.getBlockPoolID(); //read storage info, lock data dirs and transition fs state if necessary - storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt); + synchronized (this) { + storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt); + } final StorageInfo bpStorage = storage.getBPStorage(bpid); LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID() + ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 553208eeafd..4c03151e3b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -94,8 +94,8 @@ public interface FsDatasetSpi extends FSDatasetMBean { public List getVolumes(); /** Add an array of StorageLocation to FsDataset. */ - public void addVolumes(Collection volumes) - throws IOException; + public List addVolumes(List volumes, + final Collection bpids); /** Removes a collection of volumes from FsDataset. */ public void removeVolumes(Collection volumes); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 4e93772b637..1584a96b657 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -28,19 +28,23 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; import javax.management.StandardMBean; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -85,6 +89,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.util.DataChecksum; @@ -245,7 +250,7 @@ class FsDatasetImpl implements FsDatasetSpi { + ", volume failures tolerated: " + volFailuresTolerated); } - storageMap = new HashMap(); + storageMap = new ConcurrentHashMap(); volumeMap = new ReplicaMap(this); @SuppressWarnings("unchecked") final VolumeChoosingPolicy blockChooserImpl = @@ -275,45 +280,124 @@ class FsDatasetImpl implements FsDatasetSpi { // storageMap and asyncDiskService, consistent. FsVolumeImpl fsVolume = new FsVolumeImpl( this, sd.getStorageUuid(), dir, this.conf, storageType); - fsVolume.getVolumeMap(volumeMap); + ReplicaMap tempVolumeMap = new ReplicaMap(this); + fsVolume.getVolumeMap(tempVolumeMap); + volumeMap.addAll(tempVolumeMap); volumes.addVolume(fsVolume); storageMap.put(sd.getStorageUuid(), new DatanodeStorage(sd.getStorageUuid(), - DatanodeStorage.State.NORMAL, - storageType)); + DatanodeStorage.State.NORMAL, + storageType)); asyncDiskService.addVolume(sd.getCurrentDir()); LOG.info("Added volume - " + dir + ", StorageType: " + storageType); } + private void addVolumeAndBlockPool(Collection dataLocations, + Storage.StorageDirectory sd, final Collection bpids) + throws IOException { + final File dir = sd.getCurrentDir(); + final StorageType storageType = + getStorageTypeFromLocations(dataLocations, sd.getRoot()); + + final FsVolumeImpl fsVolume = new FsVolumeImpl( + this, sd.getStorageUuid(), dir, this.conf, storageType); + final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume); + + List exceptions = Lists.newArrayList(); + for (final String bpid : bpids) { + try { + fsVolume.addBlockPool(bpid, this.conf); + fsVolume.getVolumeMap(bpid, tempVolumeMap); + } catch (IOException e) { + LOG.warn("Caught exception when adding " + fsVolume + + ". Will throw later.", e); + exceptions.add(e); + } + } + if (!exceptions.isEmpty()) { + // The states of FsDatasteImpl are not modified, thus no need to rolled back. + throw MultipleIOException.createIOException(exceptions); + } + + volumeMap.addAll(tempVolumeMap); + storageMap.put(sd.getStorageUuid(), + new DatanodeStorage(sd.getStorageUuid(), + DatanodeStorage.State.NORMAL, + storageType)); + asyncDiskService.addVolume(sd.getCurrentDir()); + volumes.addVolume(fsVolume); + + LOG.info("Added volume - " + dir + ", StorageType: " + storageType); + } + /** * Add an array of StorageLocation to FsDataset. * * @pre dataStorage must have these volumes. - * @param volumes - * @throws IOException + * @param volumes an array of storage locations for adding volumes. + * @param bpids block pool IDs. + * @return an array of successfully loaded volumes. */ @Override - public synchronized void addVolumes(Collection volumes) - throws IOException { + public synchronized List addVolumes( + final List volumes, final Collection bpids) { final Collection dataLocations = DataNode.getStorageLocations(this.conf); - Map allStorageDirs = + final Map allStorageDirs = new HashMap(); - for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { - Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); - allStorageDirs.put(sd.getRoot().getAbsolutePath(), sd); + List succeedVolumes = Lists.newArrayList(); + try { + for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { + Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); + allStorageDirs.put(sd.getRoot().getCanonicalPath(), sd); + } + } catch (IOException ioe) { + LOG.warn("Caught exception when parsing storage URL.", ioe); + return succeedVolumes; } - for (StorageLocation vol : volumes) { - String key = vol.getFile().getAbsolutePath(); - if (!allStorageDirs.containsKey(key)) { - LOG.warn("Attempt to add an invalid volume: " + vol.getFile()); - } else { - addVolume(dataLocations, allStorageDirs.get(key)); + final boolean[] successFlags = new boolean[volumes.size()]; + Arrays.fill(successFlags, false); + List volumeAddingThreads = Lists.newArrayList(); + for (int i = 0; i < volumes.size(); i++) { + final int idx = i; + Thread t = new Thread() { + public void run() { + StorageLocation vol = volumes.get(idx); + try { + String key = vol.getFile().getCanonicalPath(); + if (!allStorageDirs.containsKey(key)) { + LOG.warn("Attempt to add an invalid volume: " + vol.getFile()); + } else { + addVolumeAndBlockPool(dataLocations, allStorageDirs.get(key), + bpids); + successFlags[idx] = true; + } + } catch (IOException e) { + LOG.warn("Caught exception when adding volume " + vol, e); + } + } + }; + volumeAddingThreads.add(t); + t.start(); + } + + for (Thread t : volumeAddingThreads) { + try { + t.join(); + } catch (InterruptedException e) { + LOG.warn("Caught InterruptedException when adding volume.", e); } } + + for (int i = 0; i < volumes.size(); i++) { + if (successFlags[i]) { + succeedVolumes.add(volumes.get(i)); + } + } + return succeedVolumes; } /** @@ -335,9 +419,9 @@ class FsDatasetImpl implements FsDatasetSpi { String volume = sd.getRoot().toString(); LOG.info("Removing " + volume + " from FsDataset."); - this.volumes.removeVolume(volume); - storageMap.remove(sd.getStorageUuid()); + // Disable the volume from the service. asyncDiskService.removeVolume(sd.getCurrentDir()); + this.volumes.removeVolume(volume); // Removed all replica information for the blocks on the volume. Unlike // updating the volumeMap in addVolume(), this operation does not scan @@ -348,7 +432,7 @@ class FsDatasetImpl implements FsDatasetSpi { it.hasNext(); ) { ReplicaInfo block = it.next(); if (block.getVolume().getBasePath().equals(volume)) { - invalidate(bpid, block.getBlockId()); + invalidate(bpid, block); blocks.add(block); it.remove(); } @@ -357,6 +441,8 @@ class FsDatasetImpl implements FsDatasetSpi { datanode.getBlockScanner().deleteBlocks(bpid, blocks.toArray(new Block[blocks.size()])); } + + storageMap.remove(sd.getStorageUuid()); } } } @@ -1345,23 +1431,26 @@ class FsDatasetImpl implements FsDatasetSpi { /** * Invalidate a block but does not delete the actual on-disk block file. * - * It should only be used for decommissioning disks. + * It should only be used when deactivating disks. * * @param bpid the block pool ID. - * @param blockId the ID of the block. + * @param block The block to be invalidated. */ - public void invalidate(String bpid, long blockId) { + public void invalidate(String bpid, ReplicaInfo block) { // If a DFSClient has the replica in its cache of short-circuit file // descriptors (and the client is using ShortCircuitShm), invalidate it. // The short-circuit registry is null in the unit tests, because the // datanode is mock object. if (datanode.getShortCircuitRegistry() != null) { datanode.getShortCircuitRegistry().processBlockInvalidation( - new ExtendedBlockId(blockId, bpid)); + new ExtendedBlockId(block.getBlockId(), bpid)); // If the block is cached, start uncaching it. - cacheManager.uncacheBlock(bpid, blockId); + cacheManager.uncacheBlock(bpid, block.getBlockId()); } + + datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block), + block.getStorageUuid()); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index d0fad6e2b85..83d93f0dac4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1093,7 +1093,8 @@ public class SimulatedFSDataset implements FsDatasetSpi { } @Override - public void addVolumes(Collection volumes) { + public List addVolumes(List volumes, + final Collection bpids) { throw new UnsupportedOperationException(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java new file mode 100644 index 00000000000..d2b29957256 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -0,0 +1,423 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ReconfigurationException; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.BlockMissingException; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestDataNodeHotSwapVolumes { + private static final int BLOCK_SIZE = 512; + private MiniDFSCluster cluster; + + @After + public void tearDown() { + shutdown(); + } + + private void startDFSCluster(int numNameNodes, int numDataNodes) + throws IOException { + shutdown(); + Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + + /* + * Lower the DN heartbeat, DF rate, and recheck interval to one second + * so state about failures and datanode death propagates faster. + */ + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 1000); + + MiniDFSNNTopology nnTopology = + MiniDFSNNTopology.simpleFederatedTopology(numNameNodes); + + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(nnTopology) + .numDataNodes(numDataNodes) + .build(); + cluster.waitActive(); + } + + private void shutdown() { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + private void createFile(Path path, int numBlocks) + throws IOException, InterruptedException, TimeoutException { + final short replicateFactor = 1; + createFile(path, numBlocks, replicateFactor); + } + + private void createFile(Path path, int numBlocks, short replicateFactor) + throws IOException, InterruptedException, TimeoutException { + createFile(0, path, numBlocks, replicateFactor); + } + + private void createFile(int fsIdx, Path path, int numBlocks) + throws IOException, InterruptedException, TimeoutException { + final short replicateFactor = 1; + createFile(fsIdx, path, numBlocks, replicateFactor); + } + + private void createFile(int fsIdx, Path path, int numBlocks, + short replicateFactor) + throws IOException, TimeoutException, InterruptedException { + final int seed = 0; + final DistributedFileSystem fs = cluster.getFileSystem(fsIdx); + DFSTestUtil.createFile(fs, path, BLOCK_SIZE * numBlocks, + replicateFactor, seed); + DFSTestUtil.waitReplication(fs, path, replicateFactor); + } + + /** + * Verify whether a file has enough content. + */ + private static void verifyFileLength(FileSystem fs, Path path, int numBlocks) + throws IOException { + FileStatus status = fs.getFileStatus(path); + assertEquals(numBlocks * BLOCK_SIZE, status.getLen()); + } + + /** Return the number of replicas for a given block in the file. */ + private static int getNumReplicas(FileSystem fs, Path file, + int blockIdx) throws IOException { + BlockLocation locs[] = fs.getFileBlockLocations(file, 0, Long.MAX_VALUE); + return locs.length < blockIdx + 1 ? 0 : locs[blockIdx].getNames().length; + } + + /** + * Wait the block to have the exact number of replicas as expected. + */ + private static void waitReplication(FileSystem fs, Path file, int blockIdx, + int numReplicas) + throws IOException, TimeoutException, InterruptedException { + int attempts = 50; // Wait 5 seconds. + while (attempts > 0) { + if (getNumReplicas(fs, file, blockIdx) == numReplicas) { + return; + } + Thread.sleep(100); + attempts--; + } + throw new TimeoutException("Timed out waiting the " + blockIdx + "-th block" + + " of " + file + " to have " + numReplicas + " replicas."); + } + + /** Parses data dirs from DataNode's configuration. */ + private static Collection getDataDirs(DataNode datanode) { + return datanode.getConf().getTrimmedStringCollection( + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); + } + + @Test + public void testParseChangedVolumes() throws IOException { + startDFSCluster(1, 1); + DataNode dn = cluster.getDataNodes().get(0); + Configuration conf = dn.getConf(); + + String oldPaths = conf.get(DFS_DATANODE_DATA_DIR_KEY); + List oldLocations = new ArrayList(); + for (String path : oldPaths.split(",")) { + oldLocations.add(StorageLocation.parse(path)); + } + assertFalse(oldLocations.isEmpty()); + + String newPaths = "/foo/path1,/foo/path2"; + conf.set(DFS_DATANODE_DATA_DIR_KEY, newPaths); + + DataNode.ChangedVolumes changedVolumes =dn.parseChangedVolumes(); + List newVolumes = changedVolumes.newLocations; + assertEquals(2, newVolumes.size()); + assertEquals("/foo/path1", newVolumes.get(0).getFile().getAbsolutePath()); + assertEquals("/foo/path2", newVolumes.get(1).getFile().getAbsolutePath()); + + List removedVolumes = changedVolumes.deactivateLocations; + assertEquals(oldLocations.size(), removedVolumes.size()); + for (int i = 0; i < removedVolumes.size(); i++) { + assertEquals(oldLocations.get(i).getFile(), + removedVolumes.get(i).getFile()); + } + } + + @Test + public void testParseChangedVolumesFailures() throws IOException { + startDFSCluster(1, 1); + DataNode dn = cluster.getDataNodes().get(0); + Configuration conf = dn.getConf(); + try { + conf.set(DFS_DATANODE_DATA_DIR_KEY, ""); + dn.parseChangedVolumes(); + fail("Should throw IOException: empty inputs."); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains("No directory is specified.", e); + } + } + + /** Add volumes to the first DataNode. */ + private void addVolumes(int numNewVolumes) throws ReconfigurationException { + File dataDir = new File(cluster.getDataDirectory()); + DataNode dn = cluster.getDataNodes().get(0); // First DataNode. + Configuration conf = dn.getConf(); + String oldDataDir = conf.get(DFS_DATANODE_DATA_DIR_KEY); + + List newVolumeDirs = new ArrayList(); + StringBuilder newDataDirBuf = new StringBuilder(oldDataDir); + int startIdx = oldDataDir.split(",").length + 1; + // Find the first available (non-taken) directory name for data volume. + while (true) { + File volumeDir = new File(dataDir, "data" + startIdx); + if (!volumeDir.exists()) { + break; + } + startIdx++; + } + for (int i = startIdx; i < startIdx + numNewVolumes; i++) { + File volumeDir = new File(dataDir, "data" + String.valueOf(i)); + newVolumeDirs.add(volumeDir); + volumeDir.mkdirs(); + newDataDirBuf.append(","); + newDataDirBuf.append(volumeDir.toURI()); + } + + String newDataDir = newDataDirBuf.toString(); + dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir); + assertEquals(newDataDir, conf.get(DFS_DATANODE_DATA_DIR_KEY)); + + // Check that all newly created volumes are appropriately formatted. + for (File volumeDir : newVolumeDirs) { + File curDir = new File(volumeDir, "current"); + assertTrue(curDir.exists()); + assertTrue(curDir.isDirectory()); + } + } + + private List> getNumBlocksReport(int namesystemIdx) { + List> results = new ArrayList>(); + final String bpid = cluster.getNamesystem(namesystemIdx).getBlockPoolId(); + List> blockReports = + cluster.getAllBlockReports(bpid); + for (Map datanodeReport : blockReports) { + List numBlocksPerDN = new ArrayList(); + for (BlockListAsLongs blocks : datanodeReport.values()) { + numBlocksPerDN.add(blocks.getNumberOfBlocks()); + } + results.add(numBlocksPerDN); + } + return results; + } + + /** + * Test adding one volume on a running MiniDFSCluster with only one NameNode. + */ + @Test + public void testAddOneNewVolume() + throws IOException, ReconfigurationException, + InterruptedException, TimeoutException { + startDFSCluster(1, 1); + String bpid = cluster.getNamesystem().getBlockPoolId(); + final int numBlocks = 10; + + addVolumes(1); + + Path testFile = new Path("/test"); + createFile(testFile, numBlocks); + + List> blockReports = + cluster.getAllBlockReports(bpid); + assertEquals(1, blockReports.size()); // 1 DataNode + assertEquals(3, blockReports.get(0).size()); // 3 volumes + + // FSVolumeList uses Round-Robin block chooser by default. Thus the new + // blocks should be evenly located in all volumes. + int minNumBlocks = Integer.MAX_VALUE; + int maxNumBlocks = Integer.MIN_VALUE; + for (BlockListAsLongs blockList : blockReports.get(0).values()) { + minNumBlocks = Math.min(minNumBlocks, blockList.getNumberOfBlocks()); + maxNumBlocks = Math.max(maxNumBlocks, blockList.getNumberOfBlocks()); + } + assertTrue(Math.abs(maxNumBlocks - maxNumBlocks) <= 1); + verifyFileLength(cluster.getFileSystem(), testFile, numBlocks); + } + + @Test(timeout = 60000) + public void testAddVolumesDuringWrite() + throws IOException, InterruptedException, TimeoutException, + ReconfigurationException { + startDFSCluster(1, 1); + String bpid = cluster.getNamesystem().getBlockPoolId(); + Path testFile = new Path("/test"); + createFile(testFile, 4); // Each volume has 2 blocks. + + addVolumes(2); + + // Continue to write the same file, thus the new volumes will have blocks. + DFSTestUtil.appendFile(cluster.getFileSystem(), testFile, BLOCK_SIZE * 8); + verifyFileLength(cluster.getFileSystem(), testFile, 8 + 4); + // After appending data, there should be [2, 2, 4, 4] blocks in each volume + // respectively. + List expectedNumBlocks = Arrays.asList(2, 2, 4, 4); + + List> blockReports = + cluster.getAllBlockReports(bpid); + assertEquals(1, blockReports.size()); // 1 DataNode + assertEquals(4, blockReports.get(0).size()); // 4 volumes + Map dnReport = + blockReports.get(0); + List actualNumBlocks = new ArrayList(); + for (BlockListAsLongs blockList : dnReport.values()) { + actualNumBlocks.add(blockList.getNumberOfBlocks()); + } + Collections.sort(actualNumBlocks); + assertEquals(expectedNumBlocks, actualNumBlocks); + } + + @Test + public void testAddVolumesToFederationNN() + throws IOException, TimeoutException, InterruptedException, + ReconfigurationException { + // Starts a Cluster with 2 NameNode and 3 DataNodes. Each DataNode has 2 + // volumes. + final int numNameNodes = 2; + final int numDataNodes = 1; + startDFSCluster(numNameNodes, numDataNodes); + Path testFile = new Path("/test"); + // Create a file on the first namespace with 4 blocks. + createFile(0, testFile, 4); + // Create a file on the second namespace with 4 blocks. + createFile(1, testFile, 4); + + // Add 2 volumes to the first DataNode. + final int numNewVolumes = 2; + addVolumes(numNewVolumes); + + // Append to the file on the first namespace. + DFSTestUtil.appendFile(cluster.getFileSystem(0), testFile, BLOCK_SIZE * 8); + + List> actualNumBlocks = getNumBlocksReport(0); + assertEquals(cluster.getDataNodes().size(), actualNumBlocks.size()); + List blocksOnFirstDN = actualNumBlocks.get(0); + Collections.sort(blocksOnFirstDN); + assertEquals(Arrays.asList(2, 2, 4, 4), blocksOnFirstDN); + + // Verify the second namespace also has the new volumes and they are empty. + actualNumBlocks = getNumBlocksReport(1); + assertEquals(4, actualNumBlocks.get(0).size()); + assertEquals(numNewVolumes, + Collections.frequency(actualNumBlocks.get(0), 0)); + } + + @Test + public void testRemoveOneVolume() + throws ReconfigurationException, InterruptedException, TimeoutException, + IOException { + startDFSCluster(1, 1); + final short replFactor = 1; + Path testFile = new Path("/test"); + createFile(testFile, 10, replFactor); + + DataNode dn = cluster.getDataNodes().get(0); + Collection oldDirs = getDataDirs(dn); + String newDirs = oldDirs.iterator().next(); // Keep the first volume. + dn.reconfigurePropertyImpl( + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs); + dn.scheduleAllBlockReport(0); + + try { + DFSTestUtil.readFile(cluster.getFileSystem(), testFile); + fail("Expect to throw BlockMissingException."); + } catch (BlockMissingException e) { + GenericTestUtils.assertExceptionContains("Could not obtain block", e); + } + + Path newFile = new Path("/newFile"); + createFile(newFile, 6); + + String bpid = cluster.getNamesystem().getBlockPoolId(); + List> blockReports = + cluster.getAllBlockReports(bpid); + assertEquals((int)replFactor, blockReports.size()); + + BlockListAsLongs blocksForVolume1 = + blockReports.get(0).values().iterator().next(); + // The first volume has half of the testFile and full of newFile. + assertEquals(10 / 2 + 6, blocksForVolume1.getNumberOfBlocks()); + } + + @Test + public void testReplicatingAfterRemoveVolume() + throws InterruptedException, TimeoutException, IOException, + ReconfigurationException { + startDFSCluster(1, 2); + final DistributedFileSystem fs = cluster.getFileSystem(); + final short replFactor = 2; + Path testFile = new Path("/test"); + createFile(testFile, 4, replFactor); + + DataNode dn = cluster.getDataNodes().get(0); + Collection oldDirs = getDataDirs(dn); + String newDirs = oldDirs.iterator().next(); // Keep the first volume. + dn.reconfigurePropertyImpl( + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs); + + // Force DataNode to report missing blocks. + dn.scheduleAllBlockReport(0); + DataNodeTestUtils.triggerDeletionReport(dn); + + // The 2nd block only has 1 replica due to the removed data volume. + waitReplication(fs, testFile, 1, 1); + + // Wait NameNode to replica missing blocks. + DFSTestUtil.waitReplication(fs, testFile, replFactor); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 2c4c401205e..10b9f7e30a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -40,7 +40,10 @@ import org.mockito.Mockito; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -117,6 +120,7 @@ public class TestFsDatasetImpl { final int numExistingVolumes = dataset.getVolumes().size(); final int totalVolumes = numNewVolumes + numExistingVolumes; List newLocations = new ArrayList(); + Set expectedVolumes = new HashSet(); for (int i = 0; i < numNewVolumes; i++) { String path = BASE_DIR + "/newData" + i; newLocations.add(StorageLocation.parse(path)); @@ -125,13 +129,15 @@ public class TestFsDatasetImpl { } when(storage.getNumStorageDirs()).thenReturn(totalVolumes); - dataset.addVolumes(newLocations); + dataset.addVolumes(newLocations, Arrays.asList(BLOCK_POOL_IDS)); assertEquals(totalVolumes, dataset.getVolumes().size()); assertEquals(totalVolumes, dataset.storageMap.size()); + + Set actualVolumes = new HashSet(); for (int i = 0; i < numNewVolumes; i++) { - assertEquals(newLocations.get(i).getFile().getPath(), - dataset.getVolumes().get(numExistingVolumes + i).getBasePath()); + dataset.getVolumes().get(numExistingVolumes + i).getBasePath(); } + assertEquals(actualVolumes, expectedVolumes); } @Test