HDDS-116. Implement VolumeSet to manage disk volumes. Contributed by Hanisha Koneru.
This commit is contained in:
parent
6cd19b45ef
commit
59777185fc
|
@ -141,6 +141,7 @@ public final class ScmConfigKeys {
|
|||
public static final String HDDS_REST_HTTP_ADDRESS_KEY =
|
||||
"hdds.rest.http-address";
|
||||
public static final String HDDS_REST_HTTP_ADDRESS_DEFAULT = "0.0.0.0:9880";
|
||||
public static final String HDDS_DATANODE_DIR_KEY = "hdds.datanode.dir";
|
||||
public static final String HDDS_REST_CSRF_ENABLED_KEY =
|
||||
"hdds.rest.rest-csrf.enabled";
|
||||
public static final boolean HDDS_REST_CSRF_ENABLED_DEFAULT = false;
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Choose volumes in round-robin order.
|
||||
* Use fine-grained locks to synchronize volume choosing.
|
||||
*/
|
||||
public class RoundRobinVolumeChoosingPolicy implements VolumeChoosingPolicy {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
RoundRobinVolumeChoosingPolicy.class);
|
||||
|
||||
// Stores the index of the next volume to be returned.
|
||||
private AtomicInteger nextVolumeIndex = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public VolumeInfo chooseVolume(List<VolumeInfo> volumes,
|
||||
long maxContainerSize) throws IOException {
|
||||
|
||||
// No volumes available to choose from
|
||||
if (volumes.size() < 1) {
|
||||
throw new DiskOutOfSpaceException("No more available volumes");
|
||||
}
|
||||
|
||||
// since volumes could've been removed because of the failure
|
||||
// make sure we are not out of bounds
|
||||
int nextIndex = nextVolumeIndex.get();
|
||||
int currentVolumeIndex = nextIndex < volumes.size() ? nextIndex : 0;
|
||||
|
||||
int startVolumeIndex = currentVolumeIndex;
|
||||
long maxAvailable = 0;
|
||||
|
||||
while (true) {
|
||||
final VolumeInfo volume = volumes.get(currentVolumeIndex);
|
||||
long availableVolumeSize = volume.getAvailable();
|
||||
|
||||
currentVolumeIndex = (currentVolumeIndex + 1) % volumes.size();
|
||||
|
||||
if (availableVolumeSize > maxContainerSize) {
|
||||
nextVolumeIndex.compareAndSet(nextIndex, currentVolumeIndex);
|
||||
return volume;
|
||||
}
|
||||
|
||||
if (availableVolumeSize > maxAvailable) {
|
||||
maxAvailable = availableVolumeSize;
|
||||
}
|
||||
|
||||
if (currentVolumeIndex == startVolumeIndex) {
|
||||
throw new DiskOutOfSpaceException("Out of space: "
|
||||
+ "The volume with the most available space (=" + maxAvailable
|
||||
+ " B) is less than the container size (=" + maxContainerSize
|
||||
+ " B).");
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,125 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* Stores information about a disk/volume.
|
||||
*/
|
||||
public class VolumeInfo {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(VolumeInfo.class);
|
||||
|
||||
private final Path rootDir;
|
||||
private final StorageType storageType;
|
||||
private VolumeState state;
|
||||
// Capacity configured. This is useful when we want to
|
||||
// limit the visible capacity for tests. If negative, then we just
|
||||
// query from the filesystem.
|
||||
private long configuredCapacity;
|
||||
private volatile AtomicLong scmUsed = new AtomicLong(0);
|
||||
|
||||
public static class Builder {
|
||||
private final Path rootDir;
|
||||
private StorageType storageType;
|
||||
private VolumeState state;
|
||||
private long configuredCapacity;
|
||||
|
||||
public Builder(Path rootDir) {
|
||||
this.rootDir = rootDir;
|
||||
}
|
||||
|
||||
public Builder(String rootDirStr) {
|
||||
this.rootDir = new Path(rootDirStr);
|
||||
}
|
||||
|
||||
public Builder storageType(StorageType storageType) {
|
||||
this.storageType = storageType;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder volumeState(VolumeState state) {
|
||||
this.state = state;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder configuredCapacity(long configuredCapacity) {
|
||||
this.configuredCapacity = configuredCapacity;
|
||||
return this;
|
||||
}
|
||||
|
||||
public VolumeInfo build() throws IOException {
|
||||
return new VolumeInfo(this);
|
||||
}
|
||||
}
|
||||
|
||||
private VolumeInfo(Builder b) {
|
||||
|
||||
this.rootDir = b.rootDir;
|
||||
|
||||
this.storageType = (b.storageType != null ?
|
||||
b.storageType : StorageType.DEFAULT);
|
||||
|
||||
this.configuredCapacity = (b.configuredCapacity != 0 ?
|
||||
b.configuredCapacity : -1);
|
||||
|
||||
this.state = (b.state != null ? b.state : VolumeState.NOT_FORMATTED);
|
||||
|
||||
LOG.info("Creating Volume : " + rootDir + " of storage type : " +
|
||||
storageType + " and capacity : " + configuredCapacity);
|
||||
}
|
||||
|
||||
public void addSpaceUsed(long spaceUsed) {
|
||||
this.scmUsed.getAndAdd(spaceUsed);
|
||||
}
|
||||
|
||||
public long getAvailable() {
|
||||
return configuredCapacity - scmUsed.get();
|
||||
}
|
||||
|
||||
public void setState(VolumeState state) {
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
public boolean isFailed() {
|
||||
return (state == VolumeState.FAILED);
|
||||
}
|
||||
|
||||
public Path getRootDir() {
|
||||
return this.rootDir;
|
||||
}
|
||||
|
||||
public StorageType getStorageType() {
|
||||
return this.storageType;
|
||||
}
|
||||
|
||||
public enum VolumeState {
|
||||
NORMAL,
|
||||
FAILED,
|
||||
NON_EXISTENT,
|
||||
NOT_FORMATTED,
|
||||
}
|
||||
}
|
|
@ -0,0 +1,250 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||
import org.apache.hadoop.ozone.container.common.impl.VolumeInfo.VolumeState;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||
import org.apache.hadoop.util.InstrumentedLock;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* VolumeSet to manage volumes in a DataNode.
|
||||
*/
|
||||
public class VolumeSet {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(VolumeSet.class);
|
||||
|
||||
private Configuration conf;
|
||||
/**
|
||||
* {@link VolumeSet#volumeList} maintains a list of active volumes in the
|
||||
* DataNode. Each volume has one-to-one mapping with a volumeInfo object.
|
||||
*/
|
||||
private List<VolumeInfo> volumeList;
|
||||
/**
|
||||
* {@link VolumeSet#failedVolumeList} maintains a list of volumes which have
|
||||
* failed. This list is mutually exclusive to {@link VolumeSet#volumeList}.
|
||||
*/
|
||||
private List<VolumeInfo> failedVolumeList;
|
||||
/**
|
||||
* {@link VolumeSet#volumeMap} maintains a map of all volumes in the
|
||||
* DataNode irrespective of their state.
|
||||
*/
|
||||
private Map<Path, VolumeInfo> volumeMap;
|
||||
/**
|
||||
* {@link VolumeSet#volumeStateMap} maintains a list of volumes per
|
||||
* StorageType.
|
||||
*/
|
||||
private EnumMap<StorageType, List<VolumeInfo>> volumeStateMap;
|
||||
|
||||
/**
|
||||
* Lock to synchronize changes to the VolumeSet. Any update to
|
||||
* {@link VolumeSet#volumeList}, {@link VolumeSet#failedVolumeList},
|
||||
* {@link VolumeSet#volumeMap} or {@link VolumeSet#volumeStateMap} should
|
||||
* be done after acquiring this lock.
|
||||
*/
|
||||
private final AutoCloseableLock volumeSetLock;
|
||||
|
||||
public VolumeSet(Configuration conf) throws DiskOutOfSpaceException {
|
||||
this.conf = conf;
|
||||
this.volumeSetLock = new AutoCloseableLock(
|
||||
new InstrumentedLock(getClass().getName(), LOG,
|
||||
new ReentrantLock(true),
|
||||
conf.getTimeDuration(
|
||||
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
|
||||
TimeUnit.MILLISECONDS),
|
||||
300));
|
||||
|
||||
initializeVolumeSet();
|
||||
}
|
||||
|
||||
// Add DN volumes configured through ConfigKeys to volumeMap.
|
||||
private void initializeVolumeSet() throws DiskOutOfSpaceException {
|
||||
volumeList = new ArrayList<>();
|
||||
failedVolumeList = new ArrayList<>();
|
||||
volumeMap = new ConcurrentHashMap<>();
|
||||
volumeStateMap = new EnumMap<>(StorageType.class);
|
||||
|
||||
Collection<String> datanodeDirs = conf.getTrimmedStringCollection(
|
||||
HDDS_DATANODE_DIR_KEY);
|
||||
if (datanodeDirs.isEmpty()) {
|
||||
datanodeDirs = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
|
||||
}
|
||||
if (datanodeDirs.isEmpty()) {
|
||||
throw new IllegalArgumentException("No location configured in either "
|
||||
+ HDDS_DATANODE_DIR_KEY + " or " + DFS_DATANODE_DATA_DIR_KEY);
|
||||
}
|
||||
|
||||
for (StorageType storageType : StorageType.values()) {
|
||||
volumeStateMap.put(storageType, new ArrayList<VolumeInfo>());
|
||||
}
|
||||
|
||||
for (String dir : datanodeDirs) {
|
||||
try {
|
||||
VolumeInfo volumeInfo = getVolumeInfo(dir);
|
||||
|
||||
volumeList.add(volumeInfo);
|
||||
volumeMap.put(volumeInfo.getRootDir(), volumeInfo);
|
||||
volumeStateMap.get(volumeInfo.getStorageType()).add(volumeInfo);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to parse the storage location: " + dir, e);
|
||||
}
|
||||
}
|
||||
|
||||
if (volumeList.size() == 0) {
|
||||
throw new DiskOutOfSpaceException("No storage location configured");
|
||||
}
|
||||
}
|
||||
|
||||
public void acquireLock() {
|
||||
volumeSetLock.acquire();
|
||||
}
|
||||
|
||||
public void releaseLock() {
|
||||
volumeSetLock.release();
|
||||
}
|
||||
|
||||
private VolumeInfo getVolumeInfo(String rootDir) throws IOException {
|
||||
StorageLocation location = StorageLocation.parse(rootDir);
|
||||
StorageType storageType = location.getStorageType();
|
||||
|
||||
VolumeInfo.Builder volumeBuilder = new VolumeInfo.Builder(rootDir);
|
||||
volumeBuilder.storageType(storageType);
|
||||
return volumeBuilder.build();
|
||||
}
|
||||
|
||||
// Add a volume to VolumeSet
|
||||
public void addVolume(String dataDir) throws IOException {
|
||||
Path dirPath = new Path(dataDir);
|
||||
|
||||
try (AutoCloseableLock lock = volumeSetLock.acquire()) {
|
||||
if (volumeMap.containsKey(dirPath)) {
|
||||
VolumeInfo volumeInfo = volumeMap.get(dirPath);
|
||||
if (volumeInfo.isFailed()) {
|
||||
volumeInfo.setState(VolumeState.NORMAL);
|
||||
failedVolumeList.remove(volumeInfo);
|
||||
volumeList.add(volumeInfo);
|
||||
} else {
|
||||
LOG.warn("Volume : " + volumeInfo.getRootDir() + " already " +
|
||||
"exists in VolumeMap");
|
||||
}
|
||||
} else {
|
||||
VolumeInfo volumeInfo = getVolumeInfo(dataDir);
|
||||
|
||||
volumeList.add(volumeInfo);
|
||||
volumeMap.put(volumeInfo.getRootDir(), volumeInfo);
|
||||
volumeStateMap.get(volumeInfo.getStorageType()).add(volumeInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Mark a volume as failed
|
||||
public void failVolume(String dataDir) {
|
||||
Path dirPath = new Path(dataDir);
|
||||
|
||||
try (AutoCloseableLock lock = volumeSetLock.acquire()) {
|
||||
if (volumeMap.containsKey(dirPath)) {
|
||||
VolumeInfo volumeInfo = volumeMap.get(dirPath);
|
||||
if (!volumeInfo.isFailed()) {
|
||||
volumeInfo.setState(VolumeState.FAILED);
|
||||
volumeList.remove(volumeInfo);
|
||||
failedVolumeList.add(volumeInfo);
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Volume : " + dataDir + " does not exist in VolumeMap");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove a volume from the VolumeSet completely.
|
||||
public void removeVolume(String dataDir) throws IOException {
|
||||
Path dirPath = new Path(dataDir);
|
||||
|
||||
try (AutoCloseableLock lock = volumeSetLock.acquire()) {
|
||||
if (volumeMap.containsKey(dirPath)) {
|
||||
VolumeInfo volumeInfo = volumeMap.get(dirPath);
|
||||
if (!volumeInfo.isFailed()) {
|
||||
volumeList.remove(volumeInfo);
|
||||
} else {
|
||||
failedVolumeList.remove(volumeInfo);
|
||||
}
|
||||
volumeMap.remove(dirPath);
|
||||
volumeStateMap.get(volumeInfo.getStorageType()).remove(volumeInfo);
|
||||
} else {
|
||||
LOG.warn("Volume: " + dataDir + " does not exist in " + "volumeMap.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an iterator over {@link VolumeSet#volumeList}.
|
||||
*/
|
||||
public Iterator<VolumeInfo> getIterator() {
|
||||
return volumeList.iterator();
|
||||
}
|
||||
|
||||
public VolumeInfo chooseVolume(long containerSize,
|
||||
VolumeChoosingPolicy choosingPolicy) throws IOException {
|
||||
return choosingPolicy.chooseVolume(volumeList, containerSize);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public List<VolumeInfo> getVolumesList() {
|
||||
return ImmutableList.copyOf(volumeList);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public List<VolumeInfo> getFailedVolumesList() {
|
||||
return ImmutableList.copyOf(failedVolumeList);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Map<Path, VolumeInfo> getVolumeMap() {
|
||||
return ImmutableMap.copyOf(volumeMap);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Map<StorageType, List<VolumeInfo>> getVolumeStateMap() {
|
||||
return ImmutableMap.copyOf(volumeStateMap);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container.common.interfaces;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.ozone.container.common.impl.VolumeInfo;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* This interface specifies the policy for choosing volumes to store replicas.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface VolumeChoosingPolicy {
|
||||
|
||||
/**
|
||||
* Choose a volume to place a container,
|
||||
* given a list of volumes and the max container size sought for storage.
|
||||
*
|
||||
* The implementations of this interface must be thread-safe.
|
||||
*
|
||||
* @param volumes - a list of available volumes.
|
||||
* @param maxContainerSize - the maximum size of the container for which a
|
||||
* volume is sought.
|
||||
* @return the chosen volume.
|
||||
* @throws IOException when disks are unavailable or are full.
|
||||
*/
|
||||
VolumeInfo chooseVolume(List<VolumeInfo> volumes, long maxContainerSize)
|
||||
throws IOException;
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Tests {@link RoundRobinVolumeChoosingPolicy}.
|
||||
*/
|
||||
public class TestRoundRobinVolumeChoosingPolicy {
|
||||
|
||||
private RoundRobinVolumeChoosingPolicy policy;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
policy = ReflectionUtils.newInstance(
|
||||
RoundRobinVolumeChoosingPolicy.class, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRRVolumeChoosingPolicy() throws Exception {
|
||||
final List<VolumeInfo> volumes = new ArrayList<>();
|
||||
|
||||
// First volume, with 100 bytes of space.
|
||||
volumes.add(Mockito.mock(VolumeInfo.class));
|
||||
Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L);
|
||||
|
||||
// Second volume, with 200 bytes of space.
|
||||
volumes.add(Mockito.mock(VolumeInfo.class));
|
||||
Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
|
||||
|
||||
// Test two rounds of round-robin choosing
|
||||
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
|
||||
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
|
||||
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
|
||||
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
|
||||
|
||||
// The first volume has only 100L space, so the policy should
|
||||
// choose the second one in case we ask for more.
|
||||
Assert.assertEquals(volumes.get(1),
|
||||
policy.chooseVolume(volumes, 150));
|
||||
|
||||
// Fail if no volume has enough space available
|
||||
try {
|
||||
policy.chooseVolume(volumes, Long.MAX_VALUE);
|
||||
Assert.fail();
|
||||
} catch (IOException e) {
|
||||
// Passed.
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRRPolicyExceptionMessage() throws Exception {
|
||||
final List<VolumeInfo> volumes = new ArrayList<>();
|
||||
|
||||
// First volume, with 100 bytes of space.
|
||||
volumes.add(Mockito.mock(VolumeInfo.class));
|
||||
Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L);
|
||||
|
||||
// Second volume, with 200 bytes of space.
|
||||
volumes.add(Mockito.mock(VolumeInfo.class));
|
||||
Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
|
||||
|
||||
int blockSize = 300;
|
||||
try {
|
||||
policy.chooseVolume(volumes, blockSize);
|
||||
Assert.fail("expected to throw DiskOutOfSpaceException");
|
||||
} catch(DiskOutOfSpaceException e) {
|
||||
Assert.assertEquals("Not returnig the expected message",
|
||||
"Out of space: The volume with the most available space (=" + 200
|
||||
+ " B) is less than the container size (=" + blockSize + " B).",
|
||||
e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,138 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.common.interfaces;
|
||||
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.ozone.container.common.impl.VolumeInfo;
|
||||
import org.apache.hadoop.ozone.container.common.impl.VolumeSet;
|
||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Tests {@link VolumeSet} operations.
|
||||
*/
|
||||
public class TestVolumeSet {
|
||||
|
||||
private OzoneConfiguration conf;
|
||||
protected VolumeSet volumeSet;
|
||||
protected final String baseDir = MiniDFSCluster.getBaseDirectory();
|
||||
protected final String volume1 = baseDir + "disk1";
|
||||
protected final String volume2 = baseDir + "disk2";
|
||||
private final List<String> volumes = new ArrayList<>();
|
||||
|
||||
private void initializeVolumeSet() throws Exception {
|
||||
volumeSet = new VolumeSet(conf);
|
||||
}
|
||||
|
||||
@Rule
|
||||
public Timeout testTimeout = new Timeout(300_000);
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
conf = new OzoneConfiguration();
|
||||
String dataDirKey = volume1 + "," + volume2;
|
||||
volumes.add(volume1);
|
||||
volumes.add(volume2);
|
||||
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDirKey);
|
||||
initializeVolumeSet();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVolumeSetInitialization() throws Exception {
|
||||
|
||||
List<VolumeInfo> volumesList = volumeSet.getVolumesList();
|
||||
|
||||
// VolumeSet initialization should add volume1 and volume2 to VolumeSet
|
||||
assertEquals("VolumeSet intialization is incorrect",
|
||||
volumesList.size(), volumes.size());
|
||||
assertEquals(volume1, volumesList.get(0).getRootDir().toString());
|
||||
assertEquals(volume2, volumesList.get(1).getRootDir().toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddVolume() throws Exception {
|
||||
|
||||
List<VolumeInfo> volumesList = volumeSet.getVolumesList();
|
||||
assertEquals(2, volumeSet.getVolumesList().size());
|
||||
|
||||
// Add a volume to VolumeSet
|
||||
String volume3 = baseDir + "disk3";
|
||||
volumeSet.addVolume(volume3);
|
||||
|
||||
assertEquals(3, volumeSet.getVolumesList().size());
|
||||
assertEquals("AddVolume did not add requested volume to VolumeSet",
|
||||
volume3,
|
||||
volumeSet.getVolumesList().get(2).getRootDir().toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailVolume() throws Exception {
|
||||
|
||||
//Fail a volume
|
||||
volumeSet.failVolume(volume1);
|
||||
|
||||
// Failed volume should not show up in the volumeList
|
||||
assertEquals(1, volumeSet.getVolumesList().size());
|
||||
|
||||
// Failed volume should be added to FailedVolumeList
|
||||
assertEquals("Failed volume not present in FailedVolumeList",
|
||||
1, volumeSet.getFailedVolumesList().size());
|
||||
assertEquals("Failed Volume list did not match", volume1,
|
||||
volumeSet.getFailedVolumesList().get(0).getRootDir().toString());
|
||||
|
||||
// Failed volume should exist in VolumeMap with isFailed flag set to true
|
||||
Path volume1Path = new Path(volume1);
|
||||
assertTrue(volumeSet.getVolumeMap().containsKey(volume1Path));
|
||||
assertTrue(volumeSet.getVolumeMap().get(volume1Path).isFailed());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveVolume() throws Exception {
|
||||
|
||||
List<VolumeInfo> volumesList = volumeSet.getVolumesList();
|
||||
assertEquals(2, volumeSet.getVolumesList().size());
|
||||
|
||||
// Remove a volume from VolumeSet
|
||||
volumeSet.removeVolume(volume1);
|
||||
assertEquals(1, volumeSet.getVolumesList().size());
|
||||
|
||||
// Attempting to remove a volume which does not exist in VolumeSet should
|
||||
// log a warning.
|
||||
LogCapturer logs = LogCapturer.captureLogs(
|
||||
LogFactory.getLog(VolumeSet.class));
|
||||
volumeSet.removeVolume(volume1);
|
||||
assertEquals(1, volumeSet.getVolumesList().size());
|
||||
String expectedLogMessage = "Volume: " + volume1 + " does not exist in "
|
||||
+ "volumeMap.";
|
||||
assertTrue("Log output does not contain expected log message: "
|
||||
+ expectedLogMessage, logs.getOutput().contains(expectedLogMessage));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue