HDFS-6740. Make FSDataset support adding data volumes dynamically. Contributed by Lei Xu.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1616624 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
84d57bbfa8
commit
1fe7b04863
|
@ -115,6 +115,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
HDFS-6728. Dynamically add new volumes to DataStorage, formatted if
|
HDFS-6728. Dynamically add new volumes to DataStorage, formatted if
|
||||||
necessary. (Lei Xu vi atm)
|
necessary. (Lei Xu vi atm)
|
||||||
|
|
||||||
|
HDFS-6740. Make FSDataset support adding data volumes dynamically. (Lei
|
||||||
|
Xu via atm)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
||||||
|
|
|
@ -78,7 +78,7 @@ public class StorageLocation {
|
||||||
* @return A StorageLocation object if successfully parsed, null otherwise.
|
* @return A StorageLocation object if successfully parsed, null otherwise.
|
||||||
* Does not throw any exceptions.
|
* Does not throw any exceptions.
|
||||||
*/
|
*/
|
||||||
static StorageLocation parse(String rawLocation)
|
public static StorageLocation parse(String rawLocation)
|
||||||
throws IOException, SecurityException {
|
throws IOException, SecurityException {
|
||||||
Matcher matcher = regex.matcher(rawLocation);
|
Matcher matcher = regex.matcher(rawLocation);
|
||||||
StorageType storageType = StorageType.DEFAULT;
|
StorageType storageType = StorageType.DEFAULT;
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.File;
|
||||||
import java.io.FileDescriptor;
|
import java.io.FileDescriptor;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||||
|
@ -91,6 +93,10 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
||||||
/** @return a list of volumes. */
|
/** @return a list of volumes. */
|
||||||
public List<V> getVolumes();
|
public List<V> getVolumes();
|
||||||
|
|
||||||
|
/** Add an array of StorageLocation to FsDataset. */
|
||||||
|
public void addVolumes(Collection<StorageLocation> volumes)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
/** @return a storage with the given storage ID */
|
/** @return a storage with the given storage ID */
|
||||||
public DatanodeStorage getStorage(final String storageUuid);
|
public DatanodeStorage getStorage(final String storageUuid);
|
||||||
|
|
||||||
|
|
|
@ -61,6 +61,7 @@ class FsDatasetAsyncDiskService {
|
||||||
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
|
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
|
||||||
|
|
||||||
private final DataNode datanode;
|
private final DataNode datanode;
|
||||||
|
private final ThreadGroup threadGroup;
|
||||||
private Map<File, ThreadPoolExecutor> executors
|
private Map<File, ThreadPoolExecutor> executors
|
||||||
= new HashMap<File, ThreadPoolExecutor>();
|
= new HashMap<File, ThreadPoolExecutor>();
|
||||||
|
|
||||||
|
@ -70,16 +71,13 @@ class FsDatasetAsyncDiskService {
|
||||||
*
|
*
|
||||||
* The AsyncDiskServices uses one ThreadPool per volume to do the async
|
* The AsyncDiskServices uses one ThreadPool per volume to do the async
|
||||||
* disk operations.
|
* disk operations.
|
||||||
*
|
|
||||||
* @param volumes The roots of the data volumes.
|
|
||||||
*/
|
*/
|
||||||
FsDatasetAsyncDiskService(DataNode datanode, File[] volumes) {
|
FsDatasetAsyncDiskService(DataNode datanode) {
|
||||||
this.datanode = datanode;
|
this.datanode = datanode;
|
||||||
|
this.threadGroup = new ThreadGroup(getClass().getSimpleName());
|
||||||
|
}
|
||||||
|
|
||||||
final ThreadGroup threadGroup = new ThreadGroup(getClass().getSimpleName());
|
private void addExecutorForVolume(final File volume) {
|
||||||
// Create one ThreadPool per volume
|
|
||||||
for (int v = 0 ; v < volumes.length; v++) {
|
|
||||||
final File vol = volumes[v];
|
|
||||||
ThreadFactory threadFactory = new ThreadFactory() {
|
ThreadFactory threadFactory = new ThreadFactory() {
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
|
|
||||||
|
@ -91,7 +89,7 @@ class FsDatasetAsyncDiskService {
|
||||||
}
|
}
|
||||||
Thread t = new Thread(threadGroup, r);
|
Thread t = new Thread(threadGroup, r);
|
||||||
t.setName("Async disk worker #" + thisIndex +
|
t.setName("Async disk worker #" + thisIndex +
|
||||||
" for volume " + vol);
|
" for volume " + volume);
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -103,9 +101,22 @@ class FsDatasetAsyncDiskService {
|
||||||
|
|
||||||
// This can reduce the number of running threads
|
// This can reduce the number of running threads
|
||||||
executor.allowCoreThreadTimeOut(true);
|
executor.allowCoreThreadTimeOut(true);
|
||||||
executors.put(vol, executor);
|
executors.put(volume, executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts AsyncDiskService for a new volume
|
||||||
|
* @param volume the root of the new data volume.
|
||||||
|
*/
|
||||||
|
synchronized void addVolume(File volume) {
|
||||||
|
if (executors == null) {
|
||||||
|
throw new RuntimeException("AsyncDiskService is already shutdown");
|
||||||
|
}
|
||||||
|
ThreadPoolExecutor executor = executors.get(volume);
|
||||||
|
if (executor != null) {
|
||||||
|
throw new RuntimeException("Volume " + volume + " is already existed.");
|
||||||
|
}
|
||||||
|
addExecutorForVolume(volume);
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized long countPendingDeletions() {
|
synchronized long countPendingDeletions() {
|
||||||
|
|
|
@ -202,6 +202,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
final Map<String, DatanodeStorage> storageMap;
|
final Map<String, DatanodeStorage> storageMap;
|
||||||
final FsDatasetAsyncDiskService asyncDiskService;
|
final FsDatasetAsyncDiskService asyncDiskService;
|
||||||
final FsDatasetCache cacheManager;
|
final FsDatasetCache cacheManager;
|
||||||
|
private final Configuration conf;
|
||||||
private final int validVolsRequired;
|
private final int validVolsRequired;
|
||||||
|
|
||||||
final ReplicaMap volumeMap;
|
final ReplicaMap volumeMap;
|
||||||
|
@ -216,6 +217,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
this.datanode = datanode;
|
this.datanode = datanode;
|
||||||
this.dataStorage = storage;
|
this.dataStorage = storage;
|
||||||
|
this.conf = conf;
|
||||||
// The number of volumes required for operation is the total number
|
// The number of volumes required for operation is the total number
|
||||||
// of volumes minus the number of failed volumes we can tolerate.
|
// of volumes minus the number of failed volumes we can tolerate.
|
||||||
final int volFailuresTolerated =
|
final int volFailuresTolerated =
|
||||||
|
@ -242,38 +244,76 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
}
|
}
|
||||||
|
|
||||||
storageMap = new HashMap<String, DatanodeStorage>();
|
storageMap = new HashMap<String, DatanodeStorage>();
|
||||||
final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
|
|
||||||
storage.getNumStorageDirs());
|
|
||||||
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
|
||||||
Storage.StorageDirectory sd = storage.getStorageDir(idx);
|
|
||||||
final File dir = sd.getCurrentDir();
|
|
||||||
final StorageType storageType = getStorageTypeFromLocations(dataLocations, sd.getRoot());
|
|
||||||
volArray.add(new FsVolumeImpl(this, sd.getStorageUuid(), dir, conf,
|
|
||||||
storageType));
|
|
||||||
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
|
|
||||||
storageMap.put(sd.getStorageUuid(),
|
|
||||||
new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType));
|
|
||||||
}
|
|
||||||
volumeMap = new ReplicaMap(this);
|
volumeMap = new ReplicaMap(this);
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
|
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
|
||||||
ReflectionUtils.newInstance(conf.getClass(
|
ReflectionUtils.newInstance(conf.getClass(
|
||||||
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
|
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
|
||||||
RoundRobinVolumeChoosingPolicy.class,
|
RoundRobinVolumeChoosingPolicy.class,
|
||||||
VolumeChoosingPolicy.class), conf);
|
VolumeChoosingPolicy.class), conf);
|
||||||
volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl);
|
volumes = new FsVolumeList(volsFailed, blockChooserImpl);
|
||||||
volumes.initializeReplicaMaps(volumeMap);
|
asyncDiskService = new FsDatasetAsyncDiskService(datanode);
|
||||||
|
|
||||||
File[] roots = new File[storage.getNumStorageDirs()];
|
|
||||||
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
||||||
roots[idx] = storage.getStorageDir(idx).getCurrentDir();
|
addVolume(dataLocations, storage.getStorageDir(idx));
|
||||||
}
|
}
|
||||||
asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots);
|
|
||||||
cacheManager = new FsDatasetCache(this);
|
cacheManager = new FsDatasetCache(this);
|
||||||
registerMBean(datanode.getDatanodeUuid());
|
registerMBean(datanode.getDatanodeUuid());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void addVolume(Collection<StorageLocation> dataLocations,
|
||||||
|
Storage.StorageDirectory sd) throws IOException {
|
||||||
|
final File dir = sd.getCurrentDir();
|
||||||
|
final StorageType storageType =
|
||||||
|
getStorageTypeFromLocations(dataLocations, sd.getRoot());
|
||||||
|
|
||||||
|
// If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
|
||||||
|
// nothing needed to be rolled back to make various data structures, e.g.,
|
||||||
|
// storageMap and asyncDiskService, consistent.
|
||||||
|
FsVolumeImpl fsVolume = new FsVolumeImpl(
|
||||||
|
this, sd.getStorageUuid(), dir, this.conf, storageType);
|
||||||
|
fsVolume.getVolumeMap(volumeMap);
|
||||||
|
|
||||||
|
volumes.addVolume(fsVolume);
|
||||||
|
storageMap.put(sd.getStorageUuid(),
|
||||||
|
new DatanodeStorage(sd.getStorageUuid(),
|
||||||
|
DatanodeStorage.State.NORMAL,
|
||||||
|
storageType));
|
||||||
|
asyncDiskService.addVolume(sd.getCurrentDir());
|
||||||
|
|
||||||
|
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add an array of StorageLocation to FsDataset.
|
||||||
|
*
|
||||||
|
* @pre dataStorage must have these volumes.
|
||||||
|
* @param volumes
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized void addVolumes(Collection<StorageLocation> volumes)
|
||||||
|
throws IOException {
|
||||||
|
final Collection<StorageLocation> dataLocations =
|
||||||
|
DataNode.getStorageLocations(this.conf);
|
||||||
|
Map<String, Storage.StorageDirectory> allStorageDirs =
|
||||||
|
new HashMap<String, Storage.StorageDirectory>();
|
||||||
|
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
|
||||||
|
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
|
||||||
|
allStorageDirs.put(sd.getRoot().getAbsolutePath(), sd);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (StorageLocation vol : volumes) {
|
||||||
|
String key = vol.getFile().getAbsolutePath();
|
||||||
|
if (!allStorageDirs.containsKey(key)) {
|
||||||
|
LOG.warn("Attempt to add an invalid volume: " + vol.getFile());
|
||||||
|
} else {
|
||||||
|
addVolume(dataLocations, allStorageDirs.get(key));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private StorageType getStorageTypeFromLocations(
|
private StorageType getStorageTypeFromLocations(
|
||||||
Collection<StorageLocation> dataLocations, File dir) {
|
Collection<StorageLocation> dataLocations, File dir) {
|
||||||
for (StorageLocation dataLocation : dataLocations) {
|
for (StorageLocation dataLocation : dataLocations) {
|
||||||
|
|
|
@ -40,9 +40,8 @@ class FsVolumeList {
|
||||||
private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
|
private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
|
||||||
private volatile int numFailedVolumes;
|
private volatile int numFailedVolumes;
|
||||||
|
|
||||||
FsVolumeList(List<FsVolumeImpl> volumes, int failedVols,
|
FsVolumeList(int failedVols,
|
||||||
VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
|
VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
|
||||||
this.volumes = Collections.unmodifiableList(volumes);
|
|
||||||
this.blockChooser = blockChooser;
|
this.blockChooser = blockChooser;
|
||||||
this.numFailedVolumes = failedVols;
|
this.numFailedVolumes = failedVols;
|
||||||
}
|
}
|
||||||
|
@ -102,12 +101,6 @@ class FsVolumeList {
|
||||||
return remaining;
|
return remaining;
|
||||||
}
|
}
|
||||||
|
|
||||||
void initializeReplicaMaps(ReplicaMap globalReplicaMap) throws IOException {
|
|
||||||
for (FsVolumeImpl v : volumes) {
|
|
||||||
v.getVolumeMap(globalReplicaMap);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException {
|
void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException {
|
||||||
long totalStartTime = Time.monotonicNow();
|
long totalStartTime = Time.monotonicNow();
|
||||||
final List<IOException> exceptions = Collections.synchronizedList(
|
final List<IOException> exceptions = Collections.synchronizedList(
|
||||||
|
@ -205,6 +198,19 @@ class FsVolumeList {
|
||||||
return volumes.toString();
|
return volumes.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dynamically add new volumes to the existing volumes that this DN manages.
|
||||||
|
* @param newVolume the instance of new FsVolumeImpl.
|
||||||
|
*/
|
||||||
|
synchronized void addVolume(FsVolumeImpl newVolume) {
|
||||||
|
// Make a copy of volumes to add new volumes.
|
||||||
|
final List<FsVolumeImpl> volumeList = volumes == null ?
|
||||||
|
new ArrayList<FsVolumeImpl>() :
|
||||||
|
new ArrayList<FsVolumeImpl>(volumes);
|
||||||
|
volumeList.add(newVolume);
|
||||||
|
volumes = Collections.unmodifiableList(volumeList);
|
||||||
|
FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
|
||||||
|
}
|
||||||
|
|
||||||
void addBlockPool(final String bpid, final Configuration conf) throws IOException {
|
void addBlockPool(final String bpid, final Configuration conf) throws IOException {
|
||||||
long totalStartTime = Time.monotonicNow();
|
long totalStartTime = Time.monotonicNow();
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
@ -1082,6 +1083,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addVolumes(Collection<StorageLocation> volumes) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DatanodeStorage getStorage(final String storageUuid) {
|
public DatanodeStorage getStorage(final String storageUuid) {
|
||||||
return storageUuid.equals(storage.getStorageUuid()) ?
|
return storageUuid.equals(storage.getStorageUuid()) ?
|
||||||
|
|
|
@ -0,0 +1,103 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DNConf;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
public class TestFsDatasetImpl {
|
||||||
|
private static final String BASE_DIR =
|
||||||
|
System.getProperty("test.build.dir") + "/fsdatasetimpl";
|
||||||
|
private static final int NUM_INIT_VOLUMES = 2;
|
||||||
|
|
||||||
|
private DataStorage storage;
|
||||||
|
private FsDatasetImpl dataset;
|
||||||
|
|
||||||
|
private static void createStorageDirs(DataStorage storage, Configuration conf,
|
||||||
|
int numDirs) throws IOException {
|
||||||
|
List<Storage.StorageDirectory> dirs =
|
||||||
|
new ArrayList<Storage.StorageDirectory>();
|
||||||
|
List<String> dirStrings = new ArrayList<String>();
|
||||||
|
for (int i = 0; i < numDirs; i++) {
|
||||||
|
String loc = BASE_DIR + "/data" + i;
|
||||||
|
dirStrings.add(loc);
|
||||||
|
dirs.add(new Storage.StorageDirectory(new File(loc)));
|
||||||
|
when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
String dataDir = StringUtils.join(",", dirStrings);
|
||||||
|
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir);
|
||||||
|
when(storage.getNumStorageDirs()).thenReturn(numDirs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
final DataNode datanode = Mockito.mock(DataNode.class);
|
||||||
|
storage = Mockito.mock(DataStorage.class);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
final DNConf dnConf = new DNConf(conf);
|
||||||
|
|
||||||
|
when(datanode.getConf()).thenReturn(conf);
|
||||||
|
when(datanode.getDnConf()).thenReturn(dnConf);
|
||||||
|
|
||||||
|
createStorageDirs(storage, conf, NUM_INIT_VOLUMES);
|
||||||
|
dataset = new FsDatasetImpl(datanode, storage, conf);
|
||||||
|
|
||||||
|
assertEquals(NUM_INIT_VOLUMES, dataset.getVolumes().size());
|
||||||
|
assertEquals(0, dataset.getNumFailedVolumes());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddVolumes() throws IOException {
|
||||||
|
final int numNewVolumes = 3;
|
||||||
|
final int numExistingVolumes = dataset.getVolumes().size();
|
||||||
|
final int totalVolumes = numNewVolumes + numExistingVolumes;
|
||||||
|
List<StorageLocation> newLocations = new ArrayList<StorageLocation>();
|
||||||
|
for (int i = 0; i < numNewVolumes; i++) {
|
||||||
|
String path = BASE_DIR + "/newData" + i;
|
||||||
|
newLocations.add(StorageLocation.parse(path));
|
||||||
|
when(storage.getStorageDir(numExistingVolumes + i))
|
||||||
|
.thenReturn(new Storage.StorageDirectory(new File(path)));
|
||||||
|
}
|
||||||
|
when(storage.getNumStorageDirs()).thenReturn(totalVolumes);
|
||||||
|
|
||||||
|
dataset.addVolumes(newLocations);
|
||||||
|
assertEquals(totalVolumes, dataset.getVolumes().size());
|
||||||
|
for (int i = 0; i < numNewVolumes; i++) {
|
||||||
|
assertEquals(newLocations.get(i).getFile().getPath(),
|
||||||
|
dataset.getVolumes().get(numExistingVolumes + i).getBasePath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue