HDFS-6740. Make FSDataset support adding data volumes dynamically. Contributed by Lei Xu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1616623 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2014-08-07 22:59:59 +00:00
parent 8b32f84e87
commit d758be1f35
8 changed files with 231 additions and 56 deletions

View File

@ -373,6 +373,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6728. Dynamically add new volumes to DataStorage, formatted if
necessary. (Lei Xu via atm)
HDFS-6740. Make FSDataset support adding data volumes dynamically. (Lei
Xu via atm)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -78,7 +78,7 @@ public class StorageLocation {
* @return A StorageLocation object if successfully parsed, null otherwise.
* Does not throw any exceptions.
*/
static StorageLocation parse(String rawLocation)
public static StorageLocation parse(String rawLocation)
throws IOException, SecurityException {
Matcher matcher = regex.matcher(rawLocation);
StorageType storageType = StorageType.DEFAULT;

View File

@ -22,6 +22,7 @@ import java.io.File;
import java.io.FileDescriptor;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@ -91,6 +93,10 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
/** @return a list of volumes. */
public List<V> getVolumes();
/** Add an array of StorageLocation to FsDataset. */
public void addVolumes(Collection<StorageLocation> volumes)
throws IOException;
/** @return a storage with the given storage ID */
public DatanodeStorage getStorage(final String storageUuid);

View File

@ -61,6 +61,7 @@ class FsDatasetAsyncDiskService {
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
private final DataNode datanode;
private final ThreadGroup threadGroup;
private Map<File, ThreadPoolExecutor> executors
= new HashMap<File, ThreadPoolExecutor>();
@ -70,42 +71,52 @@ class FsDatasetAsyncDiskService {
*
* The AsyncDiskServices uses one ThreadPool per volume to do the async
* disk operations.
*
* @param volumes The roots of the data volumes.
*/
FsDatasetAsyncDiskService(DataNode datanode, File[] volumes) {
FsDatasetAsyncDiskService(DataNode datanode) {
this.datanode = datanode;
this.threadGroup = new ThreadGroup(getClass().getSimpleName());
}
final ThreadGroup threadGroup = new ThreadGroup(getClass().getSimpleName());
// Create one ThreadPool per volume
for (int v = 0 ; v < volumes.length; v++) {
final File vol = volumes[v];
ThreadFactory threadFactory = new ThreadFactory() {
int counter = 0;
private void addExecutorForVolume(final File volume) {
ThreadFactory threadFactory = new ThreadFactory() {
int counter = 0;
@Override
public Thread newThread(Runnable r) {
int thisIndex;
synchronized (this) {
thisIndex = counter++;
}
Thread t = new Thread(threadGroup, r);
t.setName("Async disk worker #" + thisIndex +
" for volume " + vol);
return t;
}
};
@Override
public Thread newThread(Runnable r) {
int thisIndex;
synchronized (this) {
thisIndex = counter++;
}
Thread t = new Thread(threadGroup, r);
t.setName("Async disk worker #" + thisIndex +
" for volume " + volume);
return t;
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
// This can reduce the number of running threads
executor.allowCoreThreadTimeOut(true);
executors.put(vol, executor);
// This can reduce the number of running threads
executor.allowCoreThreadTimeOut(true);
executors.put(volume, executor);
}
/**
* Starts AsyncDiskService for a new volume
* @param volume the root of the new data volume.
*/
synchronized void addVolume(File volume) {
if (executors == null) {
throw new RuntimeException("AsyncDiskService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(volume);
if (executor != null) {
throw new RuntimeException("Volume " + volume + " is already existed.");
}
addExecutorForVolume(volume);
}
synchronized long countPendingDeletions() {

View File

@ -202,6 +202,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final Map<String, DatanodeStorage> storageMap;
final FsDatasetAsyncDiskService asyncDiskService;
final FsDatasetCache cacheManager;
private final Configuration conf;
private final int validVolsRequired;
final ReplicaMap volumeMap;
@ -216,6 +217,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
) throws IOException {
this.datanode = datanode;
this.dataStorage = storage;
this.conf = conf;
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
final int volFailuresTolerated =
@ -242,38 +244,76 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
storageMap = new HashMap<String, DatanodeStorage>();
final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
storage.getNumStorageDirs());
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = storage.getStorageDir(idx);
final File dir = sd.getCurrentDir();
final StorageType storageType = getStorageTypeFromLocations(dataLocations, sd.getRoot());
volArray.add(new FsVolumeImpl(this, sd.getStorageUuid(), dir, conf,
storageType));
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
storageMap.put(sd.getStorageUuid(),
new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType));
}
volumeMap = new ReplicaMap(this);
@SuppressWarnings("unchecked")
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
ReflectionUtils.newInstance(conf.getClass(
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
RoundRobinVolumeChoosingPolicy.class,
VolumeChoosingPolicy.class), conf);
volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl);
volumes.initializeReplicaMaps(volumeMap);
volumes = new FsVolumeList(volsFailed, blockChooserImpl);
asyncDiskService = new FsDatasetAsyncDiskService(datanode);
File[] roots = new File[storage.getNumStorageDirs()];
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
roots[idx] = storage.getStorageDir(idx).getCurrentDir();
addVolume(dataLocations, storage.getStorageDir(idx));
}
asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots);
cacheManager = new FsDatasetCache(this);
registerMBean(datanode.getDatanodeUuid());
}
private void addVolume(Collection<StorageLocation> dataLocations,
Storage.StorageDirectory sd) throws IOException {
final File dir = sd.getCurrentDir();
final StorageType storageType =
getStorageTypeFromLocations(dataLocations, sd.getRoot());
// If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
// nothing needed to be rolled back to make various data structures, e.g.,
// storageMap and asyncDiskService, consistent.
FsVolumeImpl fsVolume = new FsVolumeImpl(
this, sd.getStorageUuid(), dir, this.conf, storageType);
fsVolume.getVolumeMap(volumeMap);
volumes.addVolume(fsVolume);
storageMap.put(sd.getStorageUuid(),
new DatanodeStorage(sd.getStorageUuid(),
DatanodeStorage.State.NORMAL,
storageType));
asyncDiskService.addVolume(sd.getCurrentDir());
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
}
/**
* Add an array of StorageLocation to FsDataset.
*
* @pre dataStorage must have these volumes.
* @param volumes
* @throws IOException
*/
@Override
public synchronized void addVolumes(Collection<StorageLocation> volumes)
throws IOException {
final Collection<StorageLocation> dataLocations =
DataNode.getStorageLocations(this.conf);
Map<String, Storage.StorageDirectory> allStorageDirs =
new HashMap<String, Storage.StorageDirectory>();
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
allStorageDirs.put(sd.getRoot().getAbsolutePath(), sd);
}
for (StorageLocation vol : volumes) {
String key = vol.getFile().getAbsolutePath();
if (!allStorageDirs.containsKey(key)) {
LOG.warn("Attempt to add an invalid volume: " + vol.getFile());
} else {
addVolume(dataLocations, allStorageDirs.get(key));
}
}
}
private StorageType getStorageTypeFromLocations(
Collection<StorageLocation> dataLocations, File dir) {
for (StorageLocation dataLocation : dataLocations) {

View File

@ -40,9 +40,8 @@ class FsVolumeList {
private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
private volatile int numFailedVolumes;
FsVolumeList(List<FsVolumeImpl> volumes, int failedVols,
FsVolumeList(int failedVols,
VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
this.volumes = Collections.unmodifiableList(volumes);
this.blockChooser = blockChooser;
this.numFailedVolumes = failedVols;
}
@ -101,12 +100,6 @@ class FsVolumeList {
}
return remaining;
}
void initializeReplicaMaps(ReplicaMap globalReplicaMap) throws IOException {
for (FsVolumeImpl v : volumes) {
v.getVolumeMap(globalReplicaMap);
}
}
void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException {
long totalStartTime = Time.monotonicNow();
@ -205,6 +198,19 @@ class FsVolumeList {
return volumes.toString();
}
/**
* Dynamically add new volumes to the existing volumes that this DN manages.
* @param newVolume the instance of new FsVolumeImpl.
*/
synchronized void addVolume(FsVolumeImpl newVolume) {
// Make a copy of volumes to add new volumes.
final List<FsVolumeImpl> volumeList = volumes == null ?
new ArrayList<FsVolumeImpl>() :
new ArrayList<FsVolumeImpl>(volumes);
volumeList.add(newVolume);
volumes = Collections.unmodifiableList(volumeList);
FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
}
void addBlockPool(final String bpid, final Configuration conf) throws IOException {
long totalStartTime = Time.monotonicNow();

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
@ -1082,6 +1083,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
throw new UnsupportedOperationException();
}
@Override
public void addVolumes(Collection<StorageLocation> volumes) {
throw new UnsupportedOperationException();
}
@Override
public DatanodeStorage getStorage(final String storageUuid) {
return storageUuid.equals(storage.getStorageUuid()) ?

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
public class TestFsDatasetImpl {
private static final String BASE_DIR =
System.getProperty("test.build.dir") + "/fsdatasetimpl";
private static final int NUM_INIT_VOLUMES = 2;
private DataStorage storage;
private FsDatasetImpl dataset;
private static void createStorageDirs(DataStorage storage, Configuration conf,
int numDirs) throws IOException {
List<Storage.StorageDirectory> dirs =
new ArrayList<Storage.StorageDirectory>();
List<String> dirStrings = new ArrayList<String>();
for (int i = 0; i < numDirs; i++) {
String loc = BASE_DIR + "/data" + i;
dirStrings.add(loc);
dirs.add(new Storage.StorageDirectory(new File(loc)));
when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
}
String dataDir = StringUtils.join(",", dirStrings);
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir);
when(storage.getNumStorageDirs()).thenReturn(numDirs);
}
@Before
public void setUp() throws IOException {
final DataNode datanode = Mockito.mock(DataNode.class);
storage = Mockito.mock(DataStorage.class);
Configuration conf = new Configuration();
final DNConf dnConf = new DNConf(conf);
when(datanode.getConf()).thenReturn(conf);
when(datanode.getDnConf()).thenReturn(dnConf);
createStorageDirs(storage, conf, NUM_INIT_VOLUMES);
dataset = new FsDatasetImpl(datanode, storage, conf);
assertEquals(NUM_INIT_VOLUMES, dataset.getVolumes().size());
assertEquals(0, dataset.getNumFailedVolumes());
}
@Test
public void testAddVolumes() throws IOException {
final int numNewVolumes = 3;
final int numExistingVolumes = dataset.getVolumes().size();
final int totalVolumes = numNewVolumes + numExistingVolumes;
List<StorageLocation> newLocations = new ArrayList<StorageLocation>();
for (int i = 0; i < numNewVolumes; i++) {
String path = BASE_DIR + "/newData" + i;
newLocations.add(StorageLocation.parse(path));
when(storage.getStorageDir(numExistingVolumes + i))
.thenReturn(new Storage.StorageDirectory(new File(path)));
}
when(storage.getNumStorageDirs()).thenReturn(totalVolumes);
dataset.addVolumes(newLocations);
assertEquals(totalVolumes, dataset.getVolumes().size());
for (int i = 0; i < numNewVolumes; i++) {
assertEquals(newLocations.get(i).getFile().getPath(),
dataset.getVolumes().get(numExistingVolumes + i).getBasePath());
}
}
}