HDFS-6727. Refresh data volumes on DataNode based on configuration changes (Lei Xu via Colin Patrick McCabe)

This commit is contained in:
Colin Patrick Mccabe 2014-09-18 16:46:01 -07:00
parent 5d01a684a3
commit fe38d2e9b5
9 changed files with 725 additions and 39 deletions

View File

@ -559,6 +559,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7003. Add NFS Gateway support for reading and writing to
encryption zones. (clamb via wang)
HDFS-6727. Refresh data volumes on DataNode based on configuration changes
(Lei Xu via cmccabe)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -94,7 +94,12 @@ public final class HdfsServerConstants {
NONINTERACTIVE("-nonInteractive"),
RENAMERESERVED("-renameReserved"),
METADATAVERSION("-metadataVersion"),
UPGRADEONLY("-upgradeOnly");
UPGRADEONLY("-upgradeOnly"),
// The -hotswap constant should not be used as a startup option, it is
// only used for StorageDirectory.analyzeStorage() in hot swap drive scenario.
// TODO refactor StorageDirectory.analyzeStorage() so that we can do away with
// this in StartupOption.
HOTSWAP("-hotswap");
private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = Pattern.compile(
"(\\w+)\\((\\w+)\\)");

View File

@ -464,17 +464,20 @@ public abstract class Storage extends StorageInfo {
public StorageState analyzeStorage(StartupOption startOpt, Storage storage)
throws IOException {
assert root != null : "root is null";
boolean hadMkdirs = false;
String rootPath = root.getCanonicalPath();
try { // check that storage exists
if (!root.exists()) {
// storage directory does not exist
if (startOpt != StartupOption.FORMAT) {
if (startOpt != StartupOption.FORMAT &&
startOpt != StartupOption.HOTSWAP) {
LOG.warn("Storage directory " + rootPath + " does not exist");
return StorageState.NON_EXISTENT;
}
LOG.info(rootPath + " does not exist. Creating ...");
if (!root.mkdirs())
throw new IOException("Cannot create directory " + rootPath);
hadMkdirs = true;
}
// or is inaccessible
if (!root.isDirectory()) {
@ -492,7 +495,10 @@ public abstract class Storage extends StorageInfo {
this.lock(); // lock storage if it exists
if (startOpt == HdfsServerConstants.StartupOption.FORMAT)
// If startOpt is HOTSWAP, it returns NOT_FORMATTED for empty directory,
// while it also checks the layout version.
if (startOpt == HdfsServerConstants.StartupOption.FORMAT ||
(startOpt == StartupOption.HOTSWAP && hadMkdirs))
return StorageState.NOT_FORMATTED;
if (startOpt != HdfsServerConstants.StartupOption.IMPORT) {

View File

@ -70,8 +70,10 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -80,11 +82,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.ReconfigurableBase;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
@ -137,6 +141,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@ -220,7 +225,7 @@ import com.google.protobuf.BlockingService;
*
**********************************************************/
@InterfaceAudience.Private
public class DataNode extends Configured
public class DataNode extends ReconfigurableBase
implements InterDatanodeProtocol, ClientDatanodeProtocol,
DataNodeMXBean {
public static final Log LOG = LogFactory.getLog(DataNode.class);
@ -305,6 +310,7 @@ public class DataNode extends Configured
private JvmPauseMonitor pauseMonitor;
private SecureResources secureResources = null;
// dataDirs must be accessed while holding the DataNode lock.
private List<StorageLocation> dataDirs;
private Configuration conf;
private final String confVersion;
@ -386,6 +392,149 @@ public class DataNode extends Configured
}
}
@Override
public void reconfigurePropertyImpl(String property, String newVal)
throws ReconfigurationException {
if (property.equals(DFS_DATANODE_DATA_DIR_KEY)) {
try {
LOG.info("Reconfiguring " + property + " to " + newVal);
this.refreshVolumes(newVal);
} catch (Exception e) {
throw new ReconfigurationException(property, newVal,
getConf().get(property), e);
}
} else {
throw new ReconfigurationException(
property, newVal, getConf().get(property));
}
}
/**
* Get a list of the keys of the re-configurable properties in configuration.
*/
@Override
public Collection<String> getReconfigurableProperties() {
List<String> reconfigurable =
Collections.unmodifiableList(Arrays.asList(DFS_DATANODE_DATA_DIR_KEY));
return reconfigurable;
}
/**
* Contains the StorageLocations for changed data volumes.
*/
@VisibleForTesting
static class ChangedVolumes {
List<StorageLocation> newLocations = Lists.newArrayList();
List<StorageLocation> deactivateLocations = Lists.newArrayList();
}
/**
* Parse the new DFS_DATANODE_DATA_DIR value in the configuration to detect
* changed volumes.
* @return changed volumes.
* @throws IOException if none of the directories are specified in the
* configuration.
*/
@VisibleForTesting
ChangedVolumes parseChangedVolumes() throws IOException {
List<StorageLocation> locations = getStorageLocations(getConf());
if (locations.isEmpty()) {
throw new IOException("No directory is specified.");
}
ChangedVolumes results = new ChangedVolumes();
results.newLocations.addAll(locations);
for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
it.hasNext(); ) {
Storage.StorageDirectory dir = it.next();
boolean found = false;
for (Iterator<StorageLocation> sl = results.newLocations.iterator();
sl.hasNext(); ) {
if (sl.next().getFile().getCanonicalPath().equals(
dir.getRoot().getCanonicalPath())) {
sl.remove();
found = true;
break;
}
}
if (!found) {
results.deactivateLocations.add(
StorageLocation.parse(dir.getRoot().toString()));
}
}
return results;
}
/**
* Attempts to reload data volumes with new configuration.
* @param newVolumes a comma separated string that specifies the data volumes.
* @throws Exception
*/
private synchronized void refreshVolumes(String newVolumes) throws Exception {
Configuration conf = getConf();
String oldVolumes = conf.get(DFS_DATANODE_DATA_DIR_KEY);
conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
List<StorageLocation> locations = getStorageLocations(conf);
final int numOldDataDirs = dataDirs.size();
dataDirs = locations;
ChangedVolumes changedVolumes = parseChangedVolumes();
try {
if (numOldDataDirs + changedVolumes.newLocations.size() -
changedVolumes.deactivateLocations.size() <= 0) {
throw new IOException("Attempt to remove all volumes.");
}
if (!changedVolumes.newLocations.isEmpty()) {
LOG.info("Adding new volumes: " +
Joiner.on(",").join(changedVolumes.newLocations));
// Add volumes for each Namespace
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
NamespaceInfo nsInfo = bpos.getNamespaceInfo();
LOG.info("Loading volumes for namesapce: " + nsInfo.getNamespaceID());
storage.addStorageLocations(
this, nsInfo, changedVolumes.newLocations, StartupOption.HOTSWAP);
}
List<String> bpids = Lists.newArrayList();
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
bpids.add(bpos.getBlockPoolId());
}
List<StorageLocation> succeedVolumes =
data.addVolumes(changedVolumes.newLocations, bpids);
if (succeedVolumes.size() < changedVolumes.newLocations.size()) {
List<StorageLocation> failedVolumes = Lists.newArrayList();
// Clean all failed volumes.
for (StorageLocation location : changedVolumes.newLocations) {
if (!succeedVolumes.contains(location)) {
failedVolumes.add(location);
}
}
storage.removeVolumes(failedVolumes);
data.removeVolumes(failedVolumes);
}
}
if (!changedVolumes.deactivateLocations.isEmpty()) {
LOG.info("Deactivating volumes: " +
Joiner.on(",").join(changedVolumes.deactivateLocations));
data.removeVolumes(changedVolumes.deactivateLocations);
storage.removeVolumes(changedVolumes.deactivateLocations);
}
} catch (IOException e) {
LOG.warn("There is IOException when refreshing volumes! "
+ "Recover configurations: " + DFS_DATANODE_DATA_DIR_KEY
+ " = " + oldVolumes, e);
throw e;
}
}
private synchronized void setClusterId(final String nsCid, final String bpid
) throws IOException {
if(clusterId != null && !clusterId.equals(nsCid)) {
@ -822,7 +971,9 @@ public class DataNode extends Configured
// settings global for all BPs in the Data Node
this.secureResources = resources;
this.dataDirs = dataDirs;
synchronized (this) {
this.dataDirs = dataDirs;
}
this.conf = conf;
this.dnConf = new DNConf(conf);
this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
@ -1112,7 +1263,9 @@ public class DataNode extends Configured
}
final String bpid = nsInfo.getBlockPoolID();
//read storage info, lock data dirs and transition fs state if necessary
storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt);
synchronized (this) {
storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt);
}
final StorageInfo bpStorage = storage.getBPStorage(bpid);
LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()
+ ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion()

View File

@ -94,8 +94,8 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
public List<V> getVolumes();
/** Add an array of StorageLocation to FsDataset. */
public void addVolumes(Collection<StorageLocation> volumes)
throws IOException;
public List<StorageLocation> addVolumes(List<StorageLocation> volumes,
final Collection<String> bpids);
/** Removes a collection of volumes from FsDataset. */
public void removeVolumes(Collection<StorageLocation> volumes);

View File

@ -28,19 +28,23 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -85,6 +89,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.DataChecksum;
@ -245,7 +250,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
+ ", volume failures tolerated: " + volFailuresTolerated);
}
storageMap = new HashMap<String, DatanodeStorage>();
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
volumeMap = new ReplicaMap(this);
@SuppressWarnings("unchecked")
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
@ -275,45 +280,124 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// storageMap and asyncDiskService, consistent.
FsVolumeImpl fsVolume = new FsVolumeImpl(
this, sd.getStorageUuid(), dir, this.conf, storageType);
fsVolume.getVolumeMap(volumeMap);
ReplicaMap tempVolumeMap = new ReplicaMap(this);
fsVolume.getVolumeMap(tempVolumeMap);
volumeMap.addAll(tempVolumeMap);
volumes.addVolume(fsVolume);
storageMap.put(sd.getStorageUuid(),
new DatanodeStorage(sd.getStorageUuid(),
DatanodeStorage.State.NORMAL,
storageType));
DatanodeStorage.State.NORMAL,
storageType));
asyncDiskService.addVolume(sd.getCurrentDir());
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
}
private void addVolumeAndBlockPool(Collection<StorageLocation> dataLocations,
Storage.StorageDirectory sd, final Collection<String> bpids)
throws IOException {
final File dir = sd.getCurrentDir();
final StorageType storageType =
getStorageTypeFromLocations(dataLocations, sd.getRoot());
final FsVolumeImpl fsVolume = new FsVolumeImpl(
this, sd.getStorageUuid(), dir, this.conf, storageType);
final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume);
List<IOException> exceptions = Lists.newArrayList();
for (final String bpid : bpids) {
try {
fsVolume.addBlockPool(bpid, this.conf);
fsVolume.getVolumeMap(bpid, tempVolumeMap);
} catch (IOException e) {
LOG.warn("Caught exception when adding " + fsVolume +
". Will throw later.", e);
exceptions.add(e);
}
}
if (!exceptions.isEmpty()) {
// The states of FsDatasteImpl are not modified, thus no need to rolled back.
throw MultipleIOException.createIOException(exceptions);
}
volumeMap.addAll(tempVolumeMap);
storageMap.put(sd.getStorageUuid(),
new DatanodeStorage(sd.getStorageUuid(),
DatanodeStorage.State.NORMAL,
storageType));
asyncDiskService.addVolume(sd.getCurrentDir());
volumes.addVolume(fsVolume);
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
}
/**
* Add an array of StorageLocation to FsDataset.
*
* @pre dataStorage must have these volumes.
* @param volumes
* @throws IOException
* @param volumes an array of storage locations for adding volumes.
* @param bpids block pool IDs.
* @return an array of successfully loaded volumes.
*/
@Override
public synchronized void addVolumes(Collection<StorageLocation> volumes)
throws IOException {
public synchronized List<StorageLocation> addVolumes(
final List<StorageLocation> volumes, final Collection<String> bpids) {
final Collection<StorageLocation> dataLocations =
DataNode.getStorageLocations(this.conf);
Map<String, Storage.StorageDirectory> allStorageDirs =
final Map<String, Storage.StorageDirectory> allStorageDirs =
new HashMap<String, Storage.StorageDirectory>();
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
allStorageDirs.put(sd.getRoot().getAbsolutePath(), sd);
List<StorageLocation> succeedVolumes = Lists.newArrayList();
try {
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
allStorageDirs.put(sd.getRoot().getCanonicalPath(), sd);
}
} catch (IOException ioe) {
LOG.warn("Caught exception when parsing storage URL.", ioe);
return succeedVolumes;
}
for (StorageLocation vol : volumes) {
String key = vol.getFile().getAbsolutePath();
if (!allStorageDirs.containsKey(key)) {
LOG.warn("Attempt to add an invalid volume: " + vol.getFile());
} else {
addVolume(dataLocations, allStorageDirs.get(key));
final boolean[] successFlags = new boolean[volumes.size()];
Arrays.fill(successFlags, false);
List<Thread> volumeAddingThreads = Lists.newArrayList();
for (int i = 0; i < volumes.size(); i++) {
final int idx = i;
Thread t = new Thread() {
public void run() {
StorageLocation vol = volumes.get(idx);
try {
String key = vol.getFile().getCanonicalPath();
if (!allStorageDirs.containsKey(key)) {
LOG.warn("Attempt to add an invalid volume: " + vol.getFile());
} else {
addVolumeAndBlockPool(dataLocations, allStorageDirs.get(key),
bpids);
successFlags[idx] = true;
}
} catch (IOException e) {
LOG.warn("Caught exception when adding volume " + vol, e);
}
}
};
volumeAddingThreads.add(t);
t.start();
}
for (Thread t : volumeAddingThreads) {
try {
t.join();
} catch (InterruptedException e) {
LOG.warn("Caught InterruptedException when adding volume.", e);
}
}
for (int i = 0; i < volumes.size(); i++) {
if (successFlags[i]) {
succeedVolumes.add(volumes.get(i));
}
}
return succeedVolumes;
}
/**
@ -335,9 +419,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
String volume = sd.getRoot().toString();
LOG.info("Removing " + volume + " from FsDataset.");
this.volumes.removeVolume(volume);
storageMap.remove(sd.getStorageUuid());
// Disable the volume from the service.
asyncDiskService.removeVolume(sd.getCurrentDir());
this.volumes.removeVolume(volume);
// Removed all replica information for the blocks on the volume. Unlike
// updating the volumeMap in addVolume(), this operation does not scan
@ -348,7 +432,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
it.hasNext(); ) {
ReplicaInfo block = it.next();
if (block.getVolume().getBasePath().equals(volume)) {
invalidate(bpid, block.getBlockId());
invalidate(bpid, block);
blocks.add(block);
it.remove();
}
@ -357,6 +441,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
datanode.getBlockScanner().deleteBlocks(bpid,
blocks.toArray(new Block[blocks.size()]));
}
storageMap.remove(sd.getStorageUuid());
}
}
}
@ -1345,23 +1431,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
/**
* Invalidate a block but does not delete the actual on-disk block file.
*
* It should only be used for decommissioning disks.
* It should only be used when deactivating disks.
*
* @param bpid the block pool ID.
* @param blockId the ID of the block.
* @param block The block to be invalidated.
*/
public void invalidate(String bpid, long blockId) {
public void invalidate(String bpid, ReplicaInfo block) {
// If a DFSClient has the replica in its cache of short-circuit file
// descriptors (and the client is using ShortCircuitShm), invalidate it.
// The short-circuit registry is null in the unit tests, because the
// datanode is mock object.
if (datanode.getShortCircuitRegistry() != null) {
datanode.getShortCircuitRegistry().processBlockInvalidation(
new ExtendedBlockId(blockId, bpid));
new ExtendedBlockId(block.getBlockId(), bpid));
// If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, blockId);
cacheManager.uncacheBlock(bpid, block.getBlockId());
}
datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
block.getStorageUuid());
}
/**

View File

@ -1093,7 +1093,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
public void addVolumes(Collection<StorageLocation> volumes) {
public List<StorageLocation> addVolumes(List<StorageLocation> volumes,
final Collection<String> bpids) {
throw new UnsupportedOperationException();
}

View File

@ -0,0 +1,423 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestDataNodeHotSwapVolumes {
private static final int BLOCK_SIZE = 512;
private MiniDFSCluster cluster;
@After
public void tearDown() {
shutdown();
}
private void startDFSCluster(int numNameNodes, int numDataNodes)
throws IOException {
shutdown();
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
/*
* Lower the DN heartbeat, DF rate, and recheck interval to one second
* so state about failures and datanode death propagates faster.
*/
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
1000);
MiniDFSNNTopology nnTopology =
MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(nnTopology)
.numDataNodes(numDataNodes)
.build();
cluster.waitActive();
}
private void shutdown() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
private void createFile(Path path, int numBlocks)
throws IOException, InterruptedException, TimeoutException {
final short replicateFactor = 1;
createFile(path, numBlocks, replicateFactor);
}
private void createFile(Path path, int numBlocks, short replicateFactor)
throws IOException, InterruptedException, TimeoutException {
createFile(0, path, numBlocks, replicateFactor);
}
private void createFile(int fsIdx, Path path, int numBlocks)
throws IOException, InterruptedException, TimeoutException {
final short replicateFactor = 1;
createFile(fsIdx, path, numBlocks, replicateFactor);
}
private void createFile(int fsIdx, Path path, int numBlocks,
short replicateFactor)
throws IOException, TimeoutException, InterruptedException {
final int seed = 0;
final DistributedFileSystem fs = cluster.getFileSystem(fsIdx);
DFSTestUtil.createFile(fs, path, BLOCK_SIZE * numBlocks,
replicateFactor, seed);
DFSTestUtil.waitReplication(fs, path, replicateFactor);
}
/**
* Verify whether a file has enough content.
*/
private static void verifyFileLength(FileSystem fs, Path path, int numBlocks)
throws IOException {
FileStatus status = fs.getFileStatus(path);
assertEquals(numBlocks * BLOCK_SIZE, status.getLen());
}
/** Return the number of replicas for a given block in the file. */
private static int getNumReplicas(FileSystem fs, Path file,
int blockIdx) throws IOException {
BlockLocation locs[] = fs.getFileBlockLocations(file, 0, Long.MAX_VALUE);
return locs.length < blockIdx + 1 ? 0 : locs[blockIdx].getNames().length;
}
/**
* Wait the block to have the exact number of replicas as expected.
*/
private static void waitReplication(FileSystem fs, Path file, int blockIdx,
int numReplicas)
throws IOException, TimeoutException, InterruptedException {
int attempts = 50; // Wait 5 seconds.
while (attempts > 0) {
if (getNumReplicas(fs, file, blockIdx) == numReplicas) {
return;
}
Thread.sleep(100);
attempts--;
}
throw new TimeoutException("Timed out waiting the " + blockIdx + "-th block"
+ " of " + file + " to have " + numReplicas + " replicas.");
}
/** Parses data dirs from DataNode's configuration. */
private static Collection<String> getDataDirs(DataNode datanode) {
return datanode.getConf().getTrimmedStringCollection(
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
}
@Test
public void testParseChangedVolumes() throws IOException {
startDFSCluster(1, 1);
DataNode dn = cluster.getDataNodes().get(0);
Configuration conf = dn.getConf();
String oldPaths = conf.get(DFS_DATANODE_DATA_DIR_KEY);
List<StorageLocation> oldLocations = new ArrayList<StorageLocation>();
for (String path : oldPaths.split(",")) {
oldLocations.add(StorageLocation.parse(path));
}
assertFalse(oldLocations.isEmpty());
String newPaths = "/foo/path1,/foo/path2";
conf.set(DFS_DATANODE_DATA_DIR_KEY, newPaths);
DataNode.ChangedVolumes changedVolumes =dn.parseChangedVolumes();
List<StorageLocation> newVolumes = changedVolumes.newLocations;
assertEquals(2, newVolumes.size());
assertEquals("/foo/path1", newVolumes.get(0).getFile().getAbsolutePath());
assertEquals("/foo/path2", newVolumes.get(1).getFile().getAbsolutePath());
List<StorageLocation> removedVolumes = changedVolumes.deactivateLocations;
assertEquals(oldLocations.size(), removedVolumes.size());
for (int i = 0; i < removedVolumes.size(); i++) {
assertEquals(oldLocations.get(i).getFile(),
removedVolumes.get(i).getFile());
}
}
@Test
public void testParseChangedVolumesFailures() throws IOException {
startDFSCluster(1, 1);
DataNode dn = cluster.getDataNodes().get(0);
Configuration conf = dn.getConf();
try {
conf.set(DFS_DATANODE_DATA_DIR_KEY, "");
dn.parseChangedVolumes();
fail("Should throw IOException: empty inputs.");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("No directory is specified.", e);
}
}
/** Add volumes to the first DataNode. */
private void addVolumes(int numNewVolumes) throws ReconfigurationException {
File dataDir = new File(cluster.getDataDirectory());
DataNode dn = cluster.getDataNodes().get(0); // First DataNode.
Configuration conf = dn.getConf();
String oldDataDir = conf.get(DFS_DATANODE_DATA_DIR_KEY);
List<File> newVolumeDirs = new ArrayList<File>();
StringBuilder newDataDirBuf = new StringBuilder(oldDataDir);
int startIdx = oldDataDir.split(",").length + 1;
// Find the first available (non-taken) directory name for data volume.
while (true) {
File volumeDir = new File(dataDir, "data" + startIdx);
if (!volumeDir.exists()) {
break;
}
startIdx++;
}
for (int i = startIdx; i < startIdx + numNewVolumes; i++) {
File volumeDir = new File(dataDir, "data" + String.valueOf(i));
newVolumeDirs.add(volumeDir);
volumeDir.mkdirs();
newDataDirBuf.append(",");
newDataDirBuf.append(volumeDir.toURI());
}
String newDataDir = newDataDirBuf.toString();
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir);
assertEquals(newDataDir, conf.get(DFS_DATANODE_DATA_DIR_KEY));
// Check that all newly created volumes are appropriately formatted.
for (File volumeDir : newVolumeDirs) {
File curDir = new File(volumeDir, "current");
assertTrue(curDir.exists());
assertTrue(curDir.isDirectory());
}
}
private List<List<Integer>> getNumBlocksReport(int namesystemIdx) {
List<List<Integer>> results = new ArrayList<List<Integer>>();
final String bpid = cluster.getNamesystem(namesystemIdx).getBlockPoolId();
List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
cluster.getAllBlockReports(bpid);
for (Map<DatanodeStorage, BlockListAsLongs> datanodeReport : blockReports) {
List<Integer> numBlocksPerDN = new ArrayList<Integer>();
for (BlockListAsLongs blocks : datanodeReport.values()) {
numBlocksPerDN.add(blocks.getNumberOfBlocks());
}
results.add(numBlocksPerDN);
}
return results;
}
/**
* Test adding one volume on a running MiniDFSCluster with only one NameNode.
*/
@Test
public void testAddOneNewVolume()
throws IOException, ReconfigurationException,
InterruptedException, TimeoutException {
startDFSCluster(1, 1);
String bpid = cluster.getNamesystem().getBlockPoolId();
final int numBlocks = 10;
addVolumes(1);
Path testFile = new Path("/test");
createFile(testFile, numBlocks);
List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
cluster.getAllBlockReports(bpid);
assertEquals(1, blockReports.size()); // 1 DataNode
assertEquals(3, blockReports.get(0).size()); // 3 volumes
// FSVolumeList uses Round-Robin block chooser by default. Thus the new
// blocks should be evenly located in all volumes.
int minNumBlocks = Integer.MAX_VALUE;
int maxNumBlocks = Integer.MIN_VALUE;
for (BlockListAsLongs blockList : blockReports.get(0).values()) {
minNumBlocks = Math.min(minNumBlocks, blockList.getNumberOfBlocks());
maxNumBlocks = Math.max(maxNumBlocks, blockList.getNumberOfBlocks());
}
assertTrue(Math.abs(maxNumBlocks - maxNumBlocks) <= 1);
verifyFileLength(cluster.getFileSystem(), testFile, numBlocks);
}
@Test(timeout = 60000)
public void testAddVolumesDuringWrite()
throws IOException, InterruptedException, TimeoutException,
ReconfigurationException {
startDFSCluster(1, 1);
String bpid = cluster.getNamesystem().getBlockPoolId();
Path testFile = new Path("/test");
createFile(testFile, 4); // Each volume has 2 blocks.
addVolumes(2);
// Continue to write the same file, thus the new volumes will have blocks.
DFSTestUtil.appendFile(cluster.getFileSystem(), testFile, BLOCK_SIZE * 8);
verifyFileLength(cluster.getFileSystem(), testFile, 8 + 4);
// After appending data, there should be [2, 2, 4, 4] blocks in each volume
// respectively.
List<Integer> expectedNumBlocks = Arrays.asList(2, 2, 4, 4);
List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
cluster.getAllBlockReports(bpid);
assertEquals(1, blockReports.size()); // 1 DataNode
assertEquals(4, blockReports.get(0).size()); // 4 volumes
Map<DatanodeStorage, BlockListAsLongs> dnReport =
blockReports.get(0);
List<Integer> actualNumBlocks = new ArrayList<Integer>();
for (BlockListAsLongs blockList : dnReport.values()) {
actualNumBlocks.add(blockList.getNumberOfBlocks());
}
Collections.sort(actualNumBlocks);
assertEquals(expectedNumBlocks, actualNumBlocks);
}
@Test
public void testAddVolumesToFederationNN()
throws IOException, TimeoutException, InterruptedException,
ReconfigurationException {
// Starts a Cluster with 2 NameNode and 3 DataNodes. Each DataNode has 2
// volumes.
final int numNameNodes = 2;
final int numDataNodes = 1;
startDFSCluster(numNameNodes, numDataNodes);
Path testFile = new Path("/test");
// Create a file on the first namespace with 4 blocks.
createFile(0, testFile, 4);
// Create a file on the second namespace with 4 blocks.
createFile(1, testFile, 4);
// Add 2 volumes to the first DataNode.
final int numNewVolumes = 2;
addVolumes(numNewVolumes);
// Append to the file on the first namespace.
DFSTestUtil.appendFile(cluster.getFileSystem(0), testFile, BLOCK_SIZE * 8);
List<List<Integer>> actualNumBlocks = getNumBlocksReport(0);
assertEquals(cluster.getDataNodes().size(), actualNumBlocks.size());
List<Integer> blocksOnFirstDN = actualNumBlocks.get(0);
Collections.sort(blocksOnFirstDN);
assertEquals(Arrays.asList(2, 2, 4, 4), blocksOnFirstDN);
// Verify the second namespace also has the new volumes and they are empty.
actualNumBlocks = getNumBlocksReport(1);
assertEquals(4, actualNumBlocks.get(0).size());
assertEquals(numNewVolumes,
Collections.frequency(actualNumBlocks.get(0), 0));
}
@Test
public void testRemoveOneVolume()
throws ReconfigurationException, InterruptedException, TimeoutException,
IOException {
startDFSCluster(1, 1);
final short replFactor = 1;
Path testFile = new Path("/test");
createFile(testFile, 10, replFactor);
DataNode dn = cluster.getDataNodes().get(0);
Collection<String> oldDirs = getDataDirs(dn);
String newDirs = oldDirs.iterator().next(); // Keep the first volume.
dn.reconfigurePropertyImpl(
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
dn.scheduleAllBlockReport(0);
try {
DFSTestUtil.readFile(cluster.getFileSystem(), testFile);
fail("Expect to throw BlockMissingException.");
} catch (BlockMissingException e) {
GenericTestUtils.assertExceptionContains("Could not obtain block", e);
}
Path newFile = new Path("/newFile");
createFile(newFile, 6);
String bpid = cluster.getNamesystem().getBlockPoolId();
List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
cluster.getAllBlockReports(bpid);
assertEquals((int)replFactor, blockReports.size());
BlockListAsLongs blocksForVolume1 =
blockReports.get(0).values().iterator().next();
// The first volume has half of the testFile and full of newFile.
assertEquals(10 / 2 + 6, blocksForVolume1.getNumberOfBlocks());
}
@Test
public void testReplicatingAfterRemoveVolume()
throws InterruptedException, TimeoutException, IOException,
ReconfigurationException {
startDFSCluster(1, 2);
final DistributedFileSystem fs = cluster.getFileSystem();
final short replFactor = 2;
Path testFile = new Path("/test");
createFile(testFile, 4, replFactor);
DataNode dn = cluster.getDataNodes().get(0);
Collection<String> oldDirs = getDataDirs(dn);
String newDirs = oldDirs.iterator().next(); // Keep the first volume.
dn.reconfigurePropertyImpl(
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
// Force DataNode to report missing blocks.
dn.scheduleAllBlockReport(0);
DataNodeTestUtils.triggerDeletionReport(dn);
// The 2nd block only has 1 replica due to the removed data volume.
waitReplication(fs, testFile, 1, 1);
// Wait NameNode to replica missing blocks.
DFSTestUtil.waitReplication(fs, testFile, replFactor);
}
}

View File

@ -40,7 +40,10 @@ import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@ -117,6 +120,7 @@ public class TestFsDatasetImpl {
final int numExistingVolumes = dataset.getVolumes().size();
final int totalVolumes = numNewVolumes + numExistingVolumes;
List<StorageLocation> newLocations = new ArrayList<StorageLocation>();
Set<String> expectedVolumes = new HashSet<String>();
for (int i = 0; i < numNewVolumes; i++) {
String path = BASE_DIR + "/newData" + i;
newLocations.add(StorageLocation.parse(path));
@ -125,13 +129,15 @@ public class TestFsDatasetImpl {
}
when(storage.getNumStorageDirs()).thenReturn(totalVolumes);
dataset.addVolumes(newLocations);
dataset.addVolumes(newLocations, Arrays.asList(BLOCK_POOL_IDS));
assertEquals(totalVolumes, dataset.getVolumes().size());
assertEquals(totalVolumes, dataset.storageMap.size());
Set<String> actualVolumes = new HashSet<String>();
for (int i = 0; i < numNewVolumes; i++) {
assertEquals(newLocations.get(i).getFile().getPath(),
dataset.getVolumes().get(numExistingVolumes + i).getBasePath());
dataset.getVolumes().get(numExistingVolumes + i).getBasePath();
}
assertEquals(actualVolumes, expectedVolumes);
}
@Test