From 8f4d8c43f84defd5550c3635051b056afc9ec57f Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Mon, 27 Mar 2017 17:59:49 -0700 Subject: [PATCH] HDFS-11548. Ozone: SCM: Add node pool management API. Contributed by Xiaoyu Yao --- .../ozone/scm/exceptions/SCMException.java | 91 ++++++ .../ozone/scm/exceptions/package-info.java | 18 ++ .../ozone/scm/node/NodePoolManager.java | 71 +++++ .../hadoop/ozone/scm/node/NodeStat.java | 67 +++++ .../hadoop/ozone/scm/node/SCMNodeManager.java | 18 +- .../ozone/scm/node/SCMNodePoolManager.java | 262 ++++++++++++++++++ .../hadoop/ozone/scm/node/SCMNodeStat.java | 28 +- .../org/apache/hadoop/utils/LevelDBStore.java | 4 +- .../ozone/container/common/SCMTestUtils.java | 29 ++ .../scm/node/TestContainerPlacement.java | 15 +- .../ozone/scm/node/TestNodeManager.java | 57 ++-- .../scm/node/TestSCMNodePoolManager.java | 156 +++++++++++ 12 files changed, 776 insertions(+), 40 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/package-info.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeStat.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodePoolManager.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java new file mode 100644 index 00000000000..7acec4df376 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.scm.exceptions; +import java.io.IOException; + +/** + * Exception thrown by SCM. + */ +public class SCMException extends IOException { + /** + * Error codes to make it easy to decode these exceptions. + */ + public enum ResultCodes { + FAILED_TO_LOAD_NODEPOOL, + NODE_NOT_FOUND_IN_NODEPOOL, + } + private final ResultCodes result; + + /** + * Constructs an {@code IOException} with {@code null} + * as its error detail message. + */ + public SCMException(ResultCodes result) { + this.result = result; + } + + /** + * Constructs an {@code IOException} with the specified detail message. + * + * @param message The detail message (which is saved for later retrieval by + * the + * {@link #getMessage()} method) + */ + public SCMException(String message, ResultCodes result) { + super(message); + this.result = result; + } + + /** + * Constructs an {@code IOException} with the specified detail message + * and cause. + *

+ *

Note that the detail message associated with {@code cause} is + * not automatically incorporated into this exception's detail + * message. + * + * @param message The detail message (which is saved for later retrieval by + * the + * {@link #getMessage()} method) + * @param cause The cause (which is saved for later retrieval by the {@link + * #getCause()} method). (A null value is permitted, and indicates that the + * cause is nonexistent or unknown.) + * @since 1.6 + */ + public SCMException(String message, Throwable cause, ResultCodes result) { + super(message, cause); + this.result = result; + } + + /** + * Constructs an {@code IOException} with the specified cause and a + * detail message of {@code (cause==null ? null : cause.toString())} + * (which typically contains the class and detail message of {@code cause}). + * This constructor is useful for IO exceptions that are little more + * than wrappers for other throwables. + * + * @param cause The cause (which is saved for later retrieval by the {@link + * #getCause()} method). (A null value is permitted, and indicates that the + * cause is nonexistent or unknown.) + * @since 1.6 + */ + public SCMException(Throwable cause, ResultCodes result) { + super(cause); + this.result = result; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/package-info.java new file mode 100644 index 00000000000..0922382d145 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.exceptions; +// Exceptions thrown by SCM. \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java new file mode 100644 index 00000000000..f91c7052edb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.scm.node; + + +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.scm.exceptions.SCMException; + +import java.io.Closeable; +import java.util.List; + +/** + * Interface that defines SCM NodePoolManager. + */ +public interface NodePoolManager extends Closeable { + + /** + * Add a node to a node pool. + * @param pool - name of the node pool. + * @param node - data node. + */ + void addNode(String pool, DatanodeID node); + + /** + * Remove a node from a node pool. + * @param pool - name of the node pool. + * @param node - data node. + * @throws SCMException + */ + void removeNode(String pool, DatanodeID node) + throws SCMException; + + /** + * Get a list of known node pools. + * @return a list of known node pool names or an empty list if not node pool + * is defined. + */ + List getNodePools(); + + /** + * Get all nodes of a node pool given the name of the node pool. + * @param pool - name of the node pool. + * @return a list of datanode ids or an empty list if the node pool was not + * found. + */ + List getNodes(String pool); + + /** + * Get the node pool name if the node has been added to a node pool. + * @param datanodeID - datanode ID. + * @return node pool name if it has been assigned. + * null if the node has not been assigned to any node pool yet. + */ + String getNodePool(DatanodeID datanodeID); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeStat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeStat.java new file mode 100644 index 00000000000..d6875f2a074 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeStat.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.scm.node; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Interface that defines Node Stats. + */ +interface NodeStat { + /** + * Get capacity of the node. + * @return capacity of the node. + */ + long getCapacity(); + + /** + * Get the used space of the node. + * @return the used space of the node. + */ + long getScmUsed(); + + /** + * Get the remaining space of the node. + * @return the remaining space of the node. + */ + long getRemaining(); + + /** + * Set the total/used/remaining space. + * @param total - total space. + * @param used - used space. + * @param remain - remaining space. + */ + @VisibleForTesting + void set(long total, long used, long remain); + + /** + * Adding of the stat. + * @param stat - stat to be added. + * @return updated node stat. + */ + NodeStat add(NodeStat stat); + + /** + * Subtract of the stat. + * @param stat - stat to be subtracted. + * @return updated nodestat. + */ + NodeStat subtract(NodeStat stat); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java index 214af740c02..4cf756b39cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java @@ -22,11 +22,11 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.collections.map.HashedMap; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; @@ -127,10 +127,15 @@ public class SCMNodeManager private final CommandQueue commandQueue; // Node manager MXBean private ObjectName nmInfoBean; + + // Node pool manager. + private final SCMNodePoolManager nodePoolManager; + /** * Constructs SCM machine Manager. */ - public SCMNodeManager(Configuration conf, String clusterID) { + public SCMNodeManager(OzoneConfiguration conf, String clusterID) + throws IOException { heartbeatQueue = new ConcurrentLinkedQueue<>(); healthyNodes = new ConcurrentHashMap<>(); deadNodes = new ConcurrentHashMap<>(); @@ -167,6 +172,8 @@ public class SCMNodeManager TimeUnit.MILLISECONDS); registerMXBean(); + + this.nodePoolManager = new SCMNodePoolManager(conf); } private void registerMXBean() { @@ -671,6 +678,13 @@ public class SCMNodeManager healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow()); healthyNodeCount.incrementAndGet(); nodeStats.put(datanodeID.getDatanodeUuid(), new SCMNodeStat()); + + // TODO: define node pool policy for non-default node pool. + // For now, all nodes are added to the "DefaultNodePool" upon registration + // if it has not been added to any node pool yet. + if (nodePoolManager.getNodePool(datanodeID) == null) { + nodePoolManager.addNode(SCMNodePoolManager.DEFAULT_NODEPOOL, datanodeID); + } LOG.info("Data node with ID: {} Registered.", datanodeID.getDatanodeUuid()); return RegisteredCommand.newBuilder() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java new file mode 100644 index 00000000000..4c3395479a6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.scm.node; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.scm.exceptions.SCMException; +import org.apache.hadoop.utils.LevelDBStore; +import org.iq80.leveldb.DBIterator; +import org.iq80.leveldb.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static org.apache.hadoop.ozone.scm + .exceptions.SCMException.ResultCodes.FAILED_TO_LOAD_NODEPOOL; +import static org.apache.hadoop.ozone.scm + .exceptions.SCMException.ResultCodes.NODE_NOT_FOUND_IN_NODEPOOL; +import static org.apache.hadoop.scm + .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT; +import static org.apache.hadoop.scm + .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB; + +/** + * SCM node pool manager that manges node pools. + */ +public final class SCMNodePoolManager implements NodePoolManager { + + private static final Logger LOG = + LoggerFactory.getLogger(SCMNodePoolManager.class); + private static final List EMPTY_NODE_LIST = new ArrayList<>(); + private static final List EMPTY_NODEPOOL_LIST = new ArrayList<>(); + public static final String DEFAULT_NODEPOOL = "DefaultNodePool"; + + // DB that saves the node to node pool mapping. + private LevelDBStore nodePoolStore; + + // In-memory node pool to nodes mapping + private HashMap> nodePools; + + // Read-write lock for nodepool operations + private ReadWriteLock lock; + + /** + * Construct SCMNodePoolManager class that manages node to node pool mapping. + * @param conf - configuration. + * @throws IOException + */ + public SCMNodePoolManager(final OzoneConfiguration conf) + throws IOException { + final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, + OZONE_SCM_DB_CACHE_SIZE_DEFAULT); + String scmMetaDataDir = conf.get(OzoneConfigKeys + .OZONE_CONTAINER_METADATA_DIRS); + if (scmMetaDataDir == null) { + throw new IllegalArgumentException("SCM metadata directory is invalid."); + } + Options options = new Options(); + options.cacheSize(cacheSize * OzoneConsts.MB); + options.createIfMissing(); + + File nodePoolDBPath = new File(scmMetaDataDir, "nodepool.db"); + nodePoolStore = new LevelDBStore(nodePoolDBPath, options); + nodePools = new HashMap<>(); + lock = new ReentrantReadWriteLock(); + init(); + } + + /** + * Initialize the in-memory store based on persist store from level db. + * No lock is needed as init() is only invoked by constructor. + * @throws SCMException + */ + private void init() throws SCMException { + try (DBIterator iter = nodePoolStore.getIterator()) { + for (iter.seekToFirst(); iter.hasNext(); iter.next()) { + try { + byte[] key = iter.peekNext().getKey(); + DatanodeID nodeId = DatanodeID.getFromProtoBuf( + HdfsProtos.DatanodeIDProto.PARSER.parseFrom(key)); + + byte[] value = iter.peekNext().getValue(); + String poolName = DFSUtil.bytes2String(value); + + Set nodePool = null; + if (nodePools.containsKey(poolName)) { + nodePool = nodePools.get(poolName); + } else { + nodePool = new HashSet<>(); + nodePools.put(poolName, nodePool); + } + nodePool.add(nodeId); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding node: {} to node pool: {}", nodeId, poolName); + } + } catch (Exception ex) { + LOG.warn("Can't add a datanode to node pool, continue next..."); + } + } + } catch (IOException e) { + LOG.error("Loading node pool error " + e); + throw new SCMException("Failed to load node pool", + FAILED_TO_LOAD_NODEPOOL); + } + } + + /** + * Add a datanode to a node pool. + * @param pool - name of the node pool. + * @param node - name of the datanode. + */ + @Override + public void addNode(final String pool, final DatanodeID node) { + Preconditions.checkNotNull(pool, "pool name is null"); + Preconditions.checkNotNull(node, "node is null"); + lock.writeLock().lock(); + try { + // add to the persistent store + nodePoolStore.put(node.getProtoBufMessage().toByteArray(), + DFSUtil.string2Bytes(pool)); + + // add to the in-memory store + Set nodePool = null; + if (nodePools.containsKey(pool)) { + nodePool = nodePools.get(pool); + } else { + nodePool = new HashSet(); + nodePools.put(pool, nodePool); + } + nodePool.add(node); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Remove a datanode from a node pool. + * @param pool - name of the node pool. + * @param node - datanode id. + * @throws SCMException + */ + @Override + public void removeNode(final String pool, final DatanodeID node) + throws SCMException { + Preconditions.checkNotNull(pool, "pool name is null"); + Preconditions.checkNotNull(node, "node is null"); + lock.writeLock().lock(); + try { + // Remove from the persistent store + byte[] kName = node.getProtoBufMessage().toByteArray(); + byte[] kData = nodePoolStore.get(kName); + if (kData == null) { + throw new SCMException(String.format("Unable to find node %s from" + + " pool %s in DB.", DFSUtil.bytes2String(kName), pool), + NODE_NOT_FOUND_IN_NODEPOOL); + } + nodePoolStore.delete(kName); + + // Remove from the in-memory store + if (nodePools.containsKey(pool)) { + Set nodePool = nodePools.get(pool); + nodePool.remove(node); + } else { + throw new SCMException(String.format("Unable to find node %s from" + + " pool %s in MAP.", DFSUtil.bytes2String(kName), pool), + NODE_NOT_FOUND_IN_NODEPOOL); } + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Get all the node pools. + * @return all the node pools. + */ + @Override + public List getNodePools() { + lock.readLock().lock(); + try { + if (!nodePools.isEmpty()) { + return nodePools.keySet().stream().collect(Collectors.toList()); + } else { + return EMPTY_NODEPOOL_LIST; + } + } finally { + lock.readLock().unlock(); + } + } + + /** + * Get all datanodes of a specific node pool. + * @param pool - name of the node pool. + * @return all datanodes of the specified node pool. + */ + @Override + public List getNodes(final String pool) { + Preconditions.checkNotNull(pool, "pool name is null"); + if (nodePools.containsKey(pool)) { + return nodePools.get(pool).stream().collect(Collectors.toList()); + } else { + return EMPTY_NODE_LIST; + } + } + + /** + * Get the node pool name if the node has been added to a node pool. + * @param datanodeID - datanode ID. + * @return node pool name if it has been assigned. + * null if the node has not been assigned to any node pool yet. + * TODO: Put this in a in-memory map if performance is an issue. + */ + @Override + public String getNodePool(final DatanodeID datanodeID) { + Preconditions.checkNotNull(datanodeID, "node is null"); + byte[] result = nodePoolStore.get( + datanodeID.getProtoBufMessage().toByteArray()); + if (result == null) { + return null; + } + return DFSUtil.bytes2String(result); + } + + /** + * Close node pool level db store. + * @throws IOException + */ + @Override + public void close() throws IOException { + nodePoolStore.close(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeStat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeStat.java index c206807ede5..6089e4ed2bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeStat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeStat.java @@ -23,7 +23,7 @@ import com.google.common.annotations.VisibleForTesting; /** * This class represents the SCM node stat. */ -public class SCMNodeStat { +public final class SCMNodeStat implements NodeStat { private long capacity; private long scmUsed; private long remaining; @@ -31,13 +31,14 @@ public class SCMNodeStat { public SCMNodeStat() { } - public SCMNodeStat(SCMNodeStat other) { + public SCMNodeStat(final SCMNodeStat other) { set(other.capacity, other.scmUsed, other.remaining); } /** * @return the total configured capacity of the node. */ + @Override public long getCapacity() { return capacity; } @@ -45,6 +46,7 @@ public class SCMNodeStat { /** * @return the total SCM used space on the node. */ + @Override public long getScmUsed() { return scmUsed; } @@ -52,25 +54,29 @@ public class SCMNodeStat { /** * @return the total remaining space available on the node. */ + @Override public long getRemaining() { return remaining; } @VisibleForTesting - public void set(long total, long used, long remain) { + @Override + public void set(final long total, final long used, final long remain) { this.capacity = total; this.scmUsed = used; this.remaining = remain; } - public SCMNodeStat add(SCMNodeStat stat) { + @Override + public SCMNodeStat add(final NodeStat stat) { this.capacity += stat.getCapacity(); this.scmUsed += stat.getScmUsed(); this.remaining += stat.getRemaining(); return this; } - public SCMNodeStat subtract(SCMNodeStat stat) { + @Override + public SCMNodeStat subtract(final NodeStat stat) { this.capacity -= stat.getCapacity(); this.scmUsed -= stat.getScmUsed(); this.remaining -= stat.getRemaining(); @@ -78,12 +84,12 @@ public class SCMNodeStat { } @Override - public boolean equals(Object to) { - return this == to || - (to instanceof SCMNodeStat && - capacity == ((SCMNodeStat) to).getCapacity() && - scmUsed == ((SCMNodeStat) to).getScmUsed() && - remaining == ((SCMNodeStat) to).getRemaining()); + public boolean equals(final Object to) { + return this == to + || (to instanceof SCMNodeStat + && capacity == ((SCMNodeStat) to).getCapacity() + && scmUsed == ((SCMNodeStat) to).getScmUsed() + && remaining == ((SCMNodeStat) to).getRemaining()); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java index 9c7a70d1096..46a29b45460 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java @@ -24,13 +24,14 @@ import org.iq80.leveldb.DBIterator; import org.iq80.leveldb.Options; import org.iq80.leveldb.WriteOptions; +import java.io.Closeable; import java.io.File; import java.io.IOException; /** * LevelDB interface. */ -public class LevelDBStore { +public class LevelDBStore implements Closeable { private DB db; private final File dbFile; private final Options dbOptions; @@ -106,6 +107,7 @@ public class LevelDBStore { * * @throws IOException */ + @Override public void close() throws IOException { db.close(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java index c09693b0927..252ff670a17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java @@ -38,6 +38,8 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; import java.util.Random; import java.util.UUID; @@ -165,6 +167,33 @@ public final class SCMTestUtils { return new DatanodeID(command.getDatanodeUUID(), tempDataNode); } + /** + * Get specified number of datanode IDs and registered them with node manager. + * @param nodeManager - node manager to register the datanode ids. + * @param count - number of datanode IDs needed. + * @return + */ + public static List getRegisteredDatanodeIDs( + SCMNodeManager nodeManager, int count) { + ArrayList datanodes = new ArrayList<>(); + for (int i = 0; i < count; i++) { + datanodes.add(getDatanodeID(nodeManager)); + } + return datanodes; + } + + /** + * Get specified number of datanode IDs. + * @param count - number of datanode IDs needed. + * @return + */ + public static List getDatanodeIDs(int count) { + ArrayList datanodes = new ArrayList<>(); + for (int i = 0; i < count; i++) { + datanodes.add(getDatanodeID()); + } + return datanodes; + } /** * Get a datanode ID. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java index 9e99d70fa74..02f1486a376 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java @@ -41,7 +41,6 @@ import org.junit.rules.ExpectedException; import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeoutException; @@ -66,7 +65,7 @@ public class TestContainerPlacement { * * @return Config */ - Configuration getConf() { + OzoneConfiguration getConf() { return new OzoneConfiguration(); } @@ -78,7 +77,8 @@ public class TestContainerPlacement { * @throws IOException */ - SCMNodeManager createNodeManager(Configuration config) throws IOException { + SCMNodeManager createNodeManager(OzoneConfiguration config) + throws IOException { SCMNodeManager nodeManager = new SCMNodeManager(config, UUID.randomUUID().toString()); assertFalse("Node manager should be in chill mode", @@ -103,7 +103,7 @@ public class TestContainerPlacement { @Test public void testContainerPlacementCapacity() throws IOException, InterruptedException, TimeoutException { - Configuration conf = getConf(); + OzoneConfiguration conf = getConf(); final int nodeCount = 4; final long capacity = 10L * OzoneConsts.GB; final long used = 2L * OzoneConsts.GB; @@ -119,11 +119,8 @@ public class TestContainerPlacement { SCMNodeManager nodeManager = createNodeManager(conf); ContainerMapping containerManager = createContainerManager(conf, nodeManager); - List datanodes = new ArrayList<>(nodeCount); - for (int i = 0; i < nodeCount; i++) { - datanodes.add(SCMTestUtils.getDatanodeID(nodeManager)); - } - + List datanodes = + SCMTestUtils.getRegisteredDatanodeIDs(nodeManager, nodeCount); try { for (DatanodeID datanodeID: datanodes) { StorageContainerDatanodeProtocolProtos.SCMNodeReport.Builder nrb = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java index 15586bc98b1..2eeb88910b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java @@ -17,8 +17,9 @@ */ package org.apache.hadoop.ozone.scm.node; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.ozone.protocol.proto @@ -27,13 +28,17 @@ import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMStorageReport; import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.PathUtils; import org.hamcrest.CoreMatchers; import org.junit.Assert; +import org.junit.After; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import java.io.File; import java.io.IOException; import java.util.LinkedList; import java.util.List; @@ -59,6 +64,9 @@ import static org.junit.Assert.assertTrue; * Test the Node Manager class. */ public class TestNodeManager { + + private File testDir; + @Rule public ExpectedException thrown = ExpectedException.none(); @@ -66,13 +74,27 @@ public class TestNodeManager { public static void init() throws IOException { } + @Before + public void setup() { + testDir = PathUtils.getTestDir( + TestNodeManager.class); + } + + @After + public void cleanup() { + FileUtil.fullyDelete(testDir); + } + /** * Returns a new copy of Configuration. * * @return Config */ - Configuration getConf() { - return new OzoneConfiguration(); + OzoneConfiguration getConf() { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS, + testDir.getAbsolutePath()); + return conf; } /** @@ -83,7 +105,8 @@ public class TestNodeManager { * @throws IOException */ - SCMNodeManager createNodeManager(Configuration config) throws IOException { + SCMNodeManager createNodeManager(OzoneConfiguration config) + throws IOException { SCMNodeManager nodeManager = new SCMNodeManager(config, UUID.randomUUID().toString()); assertFalse("Node manager should be in chill mode", @@ -201,7 +224,7 @@ public class TestNodeManager { @Test public void testScmShutdown() throws IOException, InterruptedException, TimeoutException { - Configuration conf = getConf(); + OzoneConfiguration conf = getConf(); conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100); SCMNodeManager nodeManager = createNodeManager(conf); DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); @@ -228,7 +251,7 @@ public class TestNodeManager { @Test public void testScmHealthyNodeCount() throws IOException, InterruptedException, TimeoutException { - Configuration conf = getConf(); + OzoneConfiguration conf = getConf(); final int count = 10; try (SCMNodeManager nodeManager = createNodeManager(conf)) { @@ -256,7 +279,7 @@ public class TestNodeManager { @Test public void testScmSanityOfUserConfig1() throws IOException, InterruptedException, TimeoutException { - Configuration conf = getConf(); + OzoneConfiguration conf = getConf(); final int interval = 100; conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval); conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1); @@ -284,7 +307,7 @@ public class TestNodeManager { @Test public void testScmSanityOfUserConfig2() throws IOException, InterruptedException, TimeoutException { - Configuration conf = getConf(); + OzoneConfiguration conf = getConf(); final int interval = 100; conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval); conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1); @@ -306,7 +329,7 @@ public class TestNodeManager { @Test public void testScmDetectStaleNode() throws IOException, InterruptedException, TimeoutException { - Configuration conf = getConf(); + OzoneConfiguration conf = getConf(); final int interval = 100; final int nodeCount = 10; conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval); @@ -362,7 +385,7 @@ public class TestNodeManager { final int interval = 100; final int nodeCount = 10; - Configuration conf = getConf(); + OzoneConfiguration conf = getConf(); conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval); conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1); conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000); @@ -483,7 +506,7 @@ public class TestNodeManager { * */ - Configuration conf = getConf(); + OzoneConfiguration conf = getConf(); conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100); conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1); conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000); @@ -659,7 +682,7 @@ public class TestNodeManager { final int staleCount = 100; final int deadCount = 10; - Configuration conf = getConf(); + OzoneConfiguration conf = getConf(); conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100); conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1); conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000); @@ -749,7 +772,7 @@ public class TestNodeManager { InterruptedException, TimeoutException { final int healthyCount = 3000; final int staleCount = 3000; - Configuration conf = getConf(); + OzoneConfiguration conf = getConf(); conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100); conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1); conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000); @@ -809,7 +832,7 @@ public class TestNodeManager { final int healthyCount = 3000; // Make the HB process thread run slower. - Configuration conf = getConf(); + OzoneConfiguration conf = getConf(); conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 500); conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1); conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 500); @@ -845,7 +868,7 @@ public class TestNodeManager { @Test public void testScmEnterAndExistChillMode() throws IOException, InterruptedException { - Configuration conf = getConf(); + OzoneConfiguration conf = getConf(); conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100); try (SCMNodeManager nodeManager = createNodeManager(conf)) { @@ -908,7 +931,7 @@ public class TestNodeManager { @Test public void testScmStatsFromNodeReport() throws IOException, InterruptedException, TimeoutException { - Configuration conf = getConf(); + OzoneConfiguration conf = getConf(); final int nodeCount = 10; final long capacity = 2000; final long used = 100; @@ -948,7 +971,7 @@ public class TestNodeManager { @Test public void testScmNodeReportUpdate() throws IOException, InterruptedException, TimeoutException { - Configuration conf = getConf(); + OzoneConfiguration conf = getConf(); final int heartbeatCount = 5; final int nodeCount = 1; final int interval = 100; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodePoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodePoolManager.java new file mode 100644 index 00000000000..365554437d3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodePoolManager.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.scm.node; + +import org.apache.commons.collections.ListUtils; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.ozone.scm.container.ContainerPlacementPolicy; +import org.apache.hadoop.ozone.scm.container.SCMContainerPlacementCapacity; +import org.apache.hadoop.scm.ScmConfigKeys; +import org.apache.hadoop.test.PathUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test for SCM node pool manager. + */ +public class TestSCMNodePoolManager { + private static final Logger LOG = + LoggerFactory.getLogger(TestSCMNodePoolManager.class); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private final File testDir = PathUtils.getTestDir( + TestSCMNodePoolManager.class); + + SCMNodePoolManager createNodePoolManager(OzoneConfiguration conf) + throws IOException { + conf.set(OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS, + testDir.getAbsolutePath()); + conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, + SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class); + return new SCMNodePoolManager(conf); + } + + /** + * Test default node pool. + * + * @throws IOException + */ + @Test + public void testDefaultNodePool() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + try { + final String defaultPool = "DefaultPool"; + NodePoolManager npMgr = createNodePoolManager(conf); + + final int nodeCount = 4; + final List nodes = SCMTestUtils.getDatanodeIDs(nodeCount); + assertEquals(0, npMgr.getNodePools().size()); + for (DatanodeID node: nodes) { + npMgr.addNode(defaultPool, node); + } + List nodesRetrieved = npMgr.getNodes(defaultPool); + assertEquals(nodeCount, nodesRetrieved.size()); + assertTwoDatanodeListsEqual(nodes, nodesRetrieved); + + DatanodeID nodeRemoved = nodes.remove(2); + npMgr.removeNode(defaultPool, nodeRemoved); + List nodesAfterRemove = npMgr.getNodes(defaultPool); + assertTwoDatanodeListsEqual(nodes, nodesAfterRemove); + + List nonExistSet = npMgr.getNodes("NonExistSet"); + assertEquals(0, nonExistSet.size()); + } finally { + FileUtil.fullyDelete(testDir); + } + } + + + /** + * Test default node pool reload. + * + * @throws IOException + */ + @Test + public void testDefaultNodePoolReload() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + final String defaultPool = "DefaultPool"; + final int nodeCount = 4; + final List nodes = SCMTestUtils.getDatanodeIDs(nodeCount); + + try { + try { + SCMNodePoolManager npMgr = createNodePoolManager(conf); + assertEquals(0, npMgr.getNodePools().size()); + for (DatanodeID node : nodes) { + npMgr.addNode(defaultPool, node); + } + List nodesRetrieved = npMgr.getNodes(defaultPool); + assertEquals(nodeCount, nodesRetrieved.size()); + assertTwoDatanodeListsEqual(nodes, nodesRetrieved); + npMgr.close(); + } finally { + LOG.info("testDefaultNodePoolReload: Finish adding nodes to pool" + + " and close."); + } + + // try reload with a new NodePoolManager instance + try { + SCMNodePoolManager npMgr = createNodePoolManager(conf); + List nodesRetrieved = npMgr.getNodes(defaultPool); + assertEquals(nodeCount, nodesRetrieved.size()); + assertTwoDatanodeListsEqual(nodes, nodesRetrieved); + } finally { + LOG.info("testDefaultNodePoolReload: Finish reloading node pool."); + } + } finally { + FileUtil.fullyDelete(testDir); + } + } + + /** + * Compare and verify that two datanode lists are equal. + * @param list1 - datanode list 1. + * @param list2 - datanode list 2. + */ + private void assertTwoDatanodeListsEqual(List list1, + List list2) { + assertEquals(list1.size(), list2.size()); + Collections.sort(list1); + Collections.sort(list2); + assertTrue(ListUtils.isEqualList(list1, list2)); + } +}