containerInfos = reports.getReportsList();
-
- for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState :
+ containerSupervisor.handleContainerReport(datanodeDetails, reports);
+ for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState :
containerInfos) {
byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID());
lock.lock();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java
new file mode 100644
index 00000000000..5bd05746bfc
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodePoolManager;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static com.google.common.util.concurrent.Uninterruptibles
+ .sleepUninterruptibly;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT;
+
+/**
+ * This class takes a set of container reports that belong to a pool and then
+ * computes the replication levels for each container.
+ */
+public class ContainerSupervisor implements Closeable {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(ContainerSupervisor.class);
+
+ private final NodePoolManager poolManager;
+ private final HashSet poolNames;
+ private final PriorityQueue poolQueue;
+ private final NodeManager nodeManager;
+ private final long containerProcessingLag;
+ private final AtomicBoolean runnable;
+ private final ExecutorService executorService;
+ private final long maxPoolWait;
+ private long poolProcessCount;
+ private final List inProgressPoolList;
+ private final AtomicInteger threadFaultCount;
+ private final int inProgressPoolMaxCount;
+
+ private final ReadWriteLock inProgressPoolListLock;
+
+ /**
+ * Returns the number of times we have processed pools.
+ * @return long
+ */
+ public long getPoolProcessCount() {
+ return poolProcessCount;
+ }
+
+
+ /**
+ * Constructs a class that computes Replication Levels.
+ *
+ * @param conf - OzoneConfiguration
+ * @param nodeManager - Node Manager
+ * @param poolManager - Pool Manager
+ */
+ public ContainerSupervisor(Configuration conf, NodeManager nodeManager,
+ NodePoolManager poolManager) {
+ Preconditions.checkNotNull(poolManager);
+ Preconditions.checkNotNull(nodeManager);
+ this.containerProcessingLag =
+ conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL,
+ OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT,
+ TimeUnit.SECONDS
+ ) * 1000;
+ int maxContainerReportThreads =
+ conf.getInt(OZONE_SCM_MAX_CONTAINER_REPORT_THREADS,
+ OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT
+ );
+ this.maxPoolWait =
+ conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT,
+ OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ this.inProgressPoolMaxCount = conf.getInt(
+ OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS,
+ OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT);
+ this.poolManager = poolManager;
+ this.nodeManager = nodeManager;
+ this.poolNames = new HashSet<>();
+ this.poolQueue = new PriorityQueue<>();
+ this.runnable = new AtomicBoolean(true);
+ this.threadFaultCount = new AtomicInteger(0);
+ this.executorService = newCachedThreadPool(
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Container Reports Processing Thread - %d")
+ .build(), maxContainerReportThreads);
+ this.inProgressPoolList = new LinkedList<>();
+ this.inProgressPoolListLock = new ReentrantReadWriteLock();
+
+ initPoolProcessThread();
+ }
+
+ private ExecutorService newCachedThreadPool(ThreadFactory threadFactory,
+ int maxThreads) {
+ return new HadoopThreadPoolExecutor(0, maxThreads, 60L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(), threadFactory);
+ }
+
+ /**
+ * Returns the number of pools that are under process right now.
+ * @return int - Number of pools that are in process.
+ */
+ public int getInProgressPoolCount() {
+ return inProgressPoolList.size();
+ }
+
+ /**
+ * Exits the background thread.
+ */
+ public void setExit() {
+ this.runnable.set(false);
+ }
+
+ /**
+ * Adds or removes pools from names that we need to process.
+ *
+ * There are two different cases that we need to process.
+ * The case where some pools are being added and some times we have to
+ * handle cases where pools are removed.
+ */
+ private void refreshPools() {
+ List pools = this.poolManager.getNodePools();
+ if (pools != null) {
+
+ HashSet removedPools =
+ computePoolDifference(this.poolNames, new HashSet<>(pools));
+
+ HashSet addedPools =
+ computePoolDifference(new HashSet<>(pools), this.poolNames);
+ // TODO: Support remove pool API in pool manager so that this code
+ // path can be tested. This never happens in the current code base.
+ for (String poolName : removedPools) {
+ for (PeriodicPool periodicPool : poolQueue) {
+ if (periodicPool.getPoolName().compareTo(poolName) == 0) {
+ poolQueue.remove(periodicPool);
+ }
+ }
+ }
+ // Remove the pool names that we have in the list.
+ this.poolNames.removeAll(removedPools);
+
+ for (String poolName : addedPools) {
+ poolQueue.add(new PeriodicPool(poolName));
+ }
+
+ // Add to the pool names we are tracking.
+ poolNames.addAll(addedPools);
+ }
+
+ }
+
+ /**
+ * Handle the case where pools are added.
+ *
+ * @param newPools - New Pools list
+ * @param oldPool - oldPool List.
+ */
+ private HashSet computePoolDifference(HashSet newPools,
+ Set oldPool) {
+ Preconditions.checkNotNull(newPools);
+ Preconditions.checkNotNull(oldPool);
+ HashSet newSet = new HashSet<>(newPools);
+ newSet.removeAll(oldPool);
+ return newSet;
+ }
+
+ private void initPoolProcessThread() {
+
+ /*
+ * Task that runs to check if we need to start a pool processing job.
+ * if so we create a pool reconciliation job and find out of all the
+ * expected containers are on the nodes.
+ */
+ Runnable processPools = () -> {
+ while (runnable.get()) {
+ // Make sure that we don't have any new pools.
+ refreshPools();
+ while (inProgressPoolList.size() < inProgressPoolMaxCount) {
+ PeriodicPool pool = poolQueue.poll();
+ if (pool != null) {
+ if (pool.getLastProcessedTime() + this.containerProcessingLag >
+ Time.monotonicNow()) {
+ LOG.debug("Not within the time window for processing: {}",
+ pool.getPoolName());
+ // we might over sleep here, not a big deal.
+ sleepUninterruptibly(this.containerProcessingLag,
+ TimeUnit.MILLISECONDS);
+ }
+ LOG.debug("Adding pool {} to container processing queue",
+ pool.getPoolName());
+ InProgressPool inProgressPool = new InProgressPool(maxPoolWait,
+ pool, this.nodeManager, this.poolManager, this.executorService);
+ inProgressPool.startReconciliation();
+ inProgressPoolListLock.writeLock().lock();
+ try {
+ inProgressPoolList.add(inProgressPool);
+ } finally {
+ inProgressPoolListLock.writeLock().unlock();
+ }
+ poolProcessCount++;
+ } else {
+ break;
+ }
+ }
+ sleepUninterruptibly(this.maxPoolWait, TimeUnit.MILLISECONDS);
+ inProgressPoolListLock.readLock().lock();
+ try {
+ for (InProgressPool inProgressPool : inProgressPoolList) {
+ inProgressPool.finalizeReconciliation();
+ poolQueue.add(inProgressPool.getPool());
+ }
+ } finally {
+ inProgressPoolListLock.readLock().unlock();
+ }
+ inProgressPoolListLock.writeLock().lock();
+ try {
+ inProgressPoolList.clear();
+ } finally {
+ inProgressPoolListLock.writeLock().unlock();
+ }
+ }
+ };
+
+ // We will have only one thread for pool processing.
+ Thread poolProcessThread = new Thread(processPools);
+ poolProcessThread.setDaemon(true);
+ poolProcessThread.setName("Pool replica thread");
+ poolProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
+ // Let us just restart this thread after logging a critical error.
+ // if this thread is not running we cannot handle commands from SCM.
+ LOG.error("Critical Error : Pool replica thread encountered an " +
+ "error. Thread: {} Error Count : {}", t.toString(), e,
+ threadFaultCount.incrementAndGet());
+ poolProcessThread.start();
+ // TODO : Add a config to restrict how many times we will restart this
+ // thread in a single session.
+ });
+ poolProcessThread.start();
+ }
+
+ /**
+ * Adds a container report to appropriate inProgress Pool.
+ * @param containerReport -- Container report for a specific container from
+ * a datanode.
+ */
+ public void handleContainerReport(DatanodeDetails datanodeDetails,
+ ContainerReportsProto containerReport) {
+ inProgressPoolListLock.readLock().lock();
+ try {
+ String poolName = poolManager.getNodePool(datanodeDetails);
+ for (InProgressPool ppool : inProgressPoolList) {
+ if (ppool.getPoolName().equalsIgnoreCase(poolName)) {
+ ppool.handleContainerReport(datanodeDetails, containerReport);
+ return;
+ }
+ }
+ // TODO: Decide if we can do anything else with this report.
+ LOG.debug("Discarding the container report for pool {}. " +
+ "That pool is not currently in the pool reconciliation process." +
+ " Container Name: {}", poolName, datanodeDetails);
+ } catch (SCMException e) {
+ LOG.warn("Skipping processing container report from datanode {}, "
+ + "cause: failed to get the corresponding node pool",
+ datanodeDetails.toString(), e);
+ } finally {
+ inProgressPoolListLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Get in process pool list, used for testing.
+ * @return List of InProgressPool
+ */
+ @VisibleForTesting
+ public List getInProcessPoolList() {
+ return inProgressPoolList;
+ }
+
+ /**
+ * Shutdown the Container Replication Manager.
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ setExit();
+ HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java
new file mode 100644
index 00000000000..4b547311dac
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodePoolManager;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerInfo;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * These are pools that are actively checking for replication status of the
+ * containers.
+ */
+public final class InProgressPool {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(InProgressPool.class);
+
+ private final PeriodicPool pool;
+ private final NodeManager nodeManager;
+ private final NodePoolManager poolManager;
+ private final ExecutorService executorService;
+ private final Map containerCountMap;
+ private final Map processedNodeSet;
+ private final long startTime;
+ private ProgressStatus status;
+ private AtomicInteger nodeCount;
+ private AtomicInteger nodeProcessed;
+ private AtomicInteger containerProcessedCount;
+ private long maxWaitTime;
+ /**
+ * Constructs an pool that is being processed.
+ * @param maxWaitTime - Maximum wait time in milliseconds.
+ * @param pool - Pool that we are working against
+ * @param nodeManager - Nodemanager
+ * @param poolManager - pool manager
+ * @param executorService - Shared Executor service.
+ */
+ InProgressPool(long maxWaitTime, PeriodicPool pool,
+ NodeManager nodeManager, NodePoolManager poolManager,
+ ExecutorService executorService) {
+ Preconditions.checkNotNull(pool);
+ Preconditions.checkNotNull(nodeManager);
+ Preconditions.checkNotNull(poolManager);
+ Preconditions.checkNotNull(executorService);
+ Preconditions.checkArgument(maxWaitTime > 0);
+ this.pool = pool;
+ this.nodeManager = nodeManager;
+ this.poolManager = poolManager;
+ this.executorService = executorService;
+ this.containerCountMap = new ConcurrentHashMap<>();
+ this.processedNodeSet = new ConcurrentHashMap<>();
+ this.maxWaitTime = maxWaitTime;
+ startTime = Time.monotonicNow();
+ }
+
+ /**
+ * Returns periodic pool.
+ *
+ * @return PeriodicPool
+ */
+ public PeriodicPool getPool() {
+ return pool;
+ }
+
+ /**
+ * We are done if we have got reports from all nodes or we have
+ * done waiting for the specified time.
+ *
+ * @return true if we are done, false otherwise.
+ */
+ public boolean isDone() {
+ return (nodeCount.get() == nodeProcessed.get()) ||
+ (this.startTime + this.maxWaitTime) > Time.monotonicNow();
+ }
+
+ /**
+ * Gets the number of containers processed.
+ *
+ * @return int
+ */
+ public int getContainerProcessedCount() {
+ return containerProcessedCount.get();
+ }
+
+ /**
+ * Returns the start time in milliseconds.
+ *
+ * @return - Start Time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * Get the number of nodes in this pool.
+ *
+ * @return - node count
+ */
+ public int getNodeCount() {
+ return nodeCount.get();
+ }
+
+ /**
+ * Get the number of nodes that we have already processed container reports
+ * from.
+ *
+ * @return - Processed count.
+ */
+ public int getNodeProcessed() {
+ return nodeProcessed.get();
+ }
+
+ /**
+ * Returns the current status.
+ *
+ * @return Status
+ */
+ public ProgressStatus getStatus() {
+ return status;
+ }
+
+ /**
+ * Starts the reconciliation process for all the nodes in the pool.
+ */
+ public void startReconciliation() {
+ List datanodeDetailsList =
+ this.poolManager.getNodes(pool.getPoolName());
+ if (datanodeDetailsList.size() == 0) {
+ LOG.error("Datanode list for {} is Empty. Pool with no nodes ? ",
+ pool.getPoolName());
+ this.status = ProgressStatus.Error;
+ return;
+ }
+
+ nodeProcessed = new AtomicInteger(0);
+ containerProcessedCount = new AtomicInteger(0);
+ nodeCount = new AtomicInteger(0);
+ this.status = ProgressStatus.InProgress;
+ this.getPool().setLastProcessedTime(Time.monotonicNow());
+ }
+
+ /**
+ * Queues a container Report for handling. This is done in a worker thread
+ * since decoding a container report might be compute intensive . We don't
+ * want to block since we have asked for bunch of container reports
+ * from a set of datanodes.
+ *
+ * @param containerReport - ContainerReport
+ */
+ public void handleContainerReport(DatanodeDetails datanodeDetails,
+ ContainerReportsProto containerReport) {
+ if (status == ProgressStatus.InProgress) {
+ executorService.submit(processContainerReport(datanodeDetails,
+ containerReport));
+ } else {
+ LOG.debug("Cannot handle container report when the pool is in {} status.",
+ status);
+ }
+ }
+
+ private Runnable processContainerReport(DatanodeDetails datanodeDetails,
+ ContainerReportsProto reports) {
+ return () -> {
+ if (processedNodeSet.computeIfAbsent(datanodeDetails.getUuid(),
+ (k) -> true)) {
+ nodeProcessed.incrementAndGet();
+ LOG.debug("Total Nodes processed : {} Node Name: {} ", nodeProcessed,
+ datanodeDetails.getUuid());
+ for (ContainerInfo info : reports.getReportsList()) {
+ containerProcessedCount.incrementAndGet();
+ LOG.debug("Total Containers processed: {} Container Name: {}",
+ containerProcessedCount.get(), info.getContainerID());
+
+ // Update the container map with count + 1 if the key exists or
+ // update the map with 1. Since this is a concurrentMap the
+ // computation and update is atomic.
+ containerCountMap.merge(info.getContainerID(), 1, Integer::sum);
+ }
+ }
+ };
+ }
+
+ /**
+ * Filter the containers based on specific rules.
+ *
+ * @param predicate -- Predicate to filter by
+ * @return A list of map entries.
+ */
+ public List> filterContainer(
+ Predicate> predicate) {
+ return containerCountMap.entrySet().stream()
+ .filter(predicate).collect(Collectors.toList());
+ }
+
+ /**
+ * Used only for testing, calling this will abort container report
+ * processing. This is very dangerous call and should not be made by any users
+ */
+ @VisibleForTesting
+ public void setDoneProcessing() {
+ nodeProcessed.set(nodeCount.get());
+ }
+
+ /**
+ * Returns the pool name.
+ *
+ * @return Name of the pool.
+ */
+ String getPoolName() {
+ return pool.getPoolName();
+ }
+
+ public void finalizeReconciliation() {
+ status = ProgressStatus.Done;
+ //TODO: Add finalizing logic. This is where actual reconciliation happens.
+ }
+
+ /**
+ * Current status of the computing replication status.
+ */
+ public enum ProgressStatus {
+ InProgress, Done, Error
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java
new file mode 100644
index 00000000000..ef28aa78d01
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Periodic pool is a pool with a time stamp, this allows us to process pools
+ * based on a cyclic clock.
+ */
+public class PeriodicPool implements Comparable {
+ private final String poolName;
+ private long lastProcessedTime;
+ private AtomicLong totalProcessedCount;
+
+ /**
+ * Constructs a periodic pool.
+ *
+ * @param poolName - Name of the pool
+ */
+ public PeriodicPool(String poolName) {
+ this.poolName = poolName;
+ lastProcessedTime = 0;
+ totalProcessedCount = new AtomicLong(0);
+ }
+
+ /**
+ * Get pool Name.
+ * @return PoolName
+ */
+ public String getPoolName() {
+ return poolName;
+ }
+
+ /**
+ * Compares this object with the specified object for order. Returns a
+ * negative integer, zero, or a positive integer as this object is less
+ * than, equal to, or greater than the specified object.
+ *
+ * @param o the object to be compared.
+ * @return a negative integer, zero, or a positive integer as this object is
+ * less than, equal to, or greater than the specified object.
+ * @throws NullPointerException if the specified object is null
+ * @throws ClassCastException if the specified object's type prevents it
+ * from being compared to this object.
+ */
+ @Override
+ public int compareTo(PeriodicPool o) {
+ return Long.compare(this.lastProcessedTime, o.lastProcessedTime);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ PeriodicPool that = (PeriodicPool) o;
+
+ return poolName.equals(that.poolName);
+ }
+
+ @Override
+ public int hashCode() {
+ return poolName.hashCode();
+ }
+
+ /**
+ * Returns the Total Times we have processed this pool.
+ *
+ * @return processed count.
+ */
+ public long getTotalProcessedCount() {
+ return totalProcessedCount.get();
+ }
+
+ /**
+ * Gets the last time we processed this pool.
+ * @return time in milliseconds
+ */
+ public long getLastProcessedTime() {
+ return this.lastProcessedTime;
+ }
+
+
+ /**
+ * Sets the last processed time.
+ *
+ * @param lastProcessedTime - Long in milliseconds.
+ */
+
+ public void setLastProcessedTime(long lastProcessedTime) {
+ this.lastProcessedTime = lastProcessedTime;
+ }
+
+ /*
+ * Increments the total processed count.
+ */
+ public void incTotalProcessedCount() {
+ this.totalProcessedCount.incrementAndGet();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
new file mode 100644
index 00000000000..7bbe2efe578
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+/*
+ This package contains routines that manage replication of a container. This
+ relies on container reports to understand the replication level of a
+ container - UnderReplicated, Replicated, OverReplicated -- and manages the
+ replication level based on that.
+ */
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 72d7e946cc0..4392633b16f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -123,6 +123,12 @@ public interface NodeManager extends StorageContainerNodeProtocol,
*/
SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails);
+ /**
+ * Returns the NodePoolManager associated with the NodeManager.
+ * @return NodePoolManager
+ */
+ NodePoolManager getNodePoolManager();
+
/**
* Wait for the heartbeat is processed by NodeManager.
* @return true if heartbeat has been processed.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java
new file mode 100644
index 00000000000..46faf9ca4d9
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Interface that defines SCM NodePoolManager.
+ */
+public interface NodePoolManager extends Closeable {
+
+ /**
+ * Add a node to a node pool.
+ * @param pool - name of the node pool.
+ * @param node - data node.
+ */
+ void addNode(String pool, DatanodeDetails node) throws IOException;
+
+ /**
+ * Remove a node from a node pool.
+ * @param pool - name of the node pool.
+ * @param node - data node.
+ * @throws SCMException
+ */
+ void removeNode(String pool, DatanodeDetails node)
+ throws SCMException;
+
+ /**
+ * Get a list of known node pools.
+ * @return a list of known node pool names or an empty list if not node pool
+ * is defined.
+ */
+ List getNodePools();
+
+ /**
+ * Get all nodes of a node pool given the name of the node pool.
+ * @param pool - name of the node pool.
+ * @return a list of datanode ids or an empty list if the node pool was not
+ * found.
+ */
+ List getNodes(String pool);
+
+ /**
+ * Get the node pool name if the node has been added to a node pool.
+ * @param datanodeDetails - datanode ID.
+ * @return node pool name if it has been assigned.
+ * null if the node has not been assigned to any node pool yet.
+ */
+ String getNodePool(DatanodeDetails datanodeDetails) throws SCMException;
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index adca8eae0c6..fc8b0137f3b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import com.google.protobuf.GeneratedMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -158,6 +159,7 @@ public class SCMNodeManager
private ObjectName nmInfoBean;
// Node pool manager.
+ private final SCMNodePoolManager nodePoolManager;
private final StorageContainerManager scmManager;
public static final Event DATANODE_COMMAND =
@@ -208,6 +210,7 @@ public class SCMNodeManager
registerMXBean();
+ this.nodePoolManager = new SCMNodePoolManager(conf);
this.scmManager = scmManager;
}
@@ -679,6 +682,7 @@ public class SCMNodeManager
@Override
public void close() throws IOException {
unregisterMXBean();
+ nodePoolManager.close();
executorService.shutdown();
try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
@@ -756,6 +760,20 @@ public class SCMNodeManager
LOG.info("Leaving startup chill mode.");
}
+ // TODO: define node pool policy for non-default node pool.
+ // For now, all nodes are added to the "DefaultNodePool" upon registration
+ // if it has not been added to any node pool yet.
+ try {
+ if (nodePoolManager.getNodePool(datanodeDetails) == null) {
+ nodePoolManager.addNode(SCMNodePoolManager.DEFAULT_NODEPOOL,
+ datanodeDetails);
+ }
+ } catch (IOException e) {
+ // TODO: make sure registration failure is handled correctly.
+ return RegisteredCommand.newBuilder()
+ .setErrorCode(ErrorCode.errorNodeNotPermitted)
+ .build();
+ }
// Updating Node Report, as registration is successful
updateNodeStat(datanodeDetails.getUuid(), nodeReport);
LOG.info("Data node with ID: {} Registered.",
@@ -841,6 +859,11 @@ public class SCMNodeManager
return new SCMNodeMetric(nodeStats.get(datanodeDetails.getUuid()));
}
+ @Override
+ public NodePoolManager getNodePoolManager() {
+ return nodePoolManager;
+ }
+
@Override
public Map getNodeCount() {
Map nodeCountMap = new HashMap();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java
new file mode 100644
index 00000000000..faf330ea1d2
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_DB_CACHE_SIZE_MB;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+ .FAILED_TO_FIND_NODE_IN_POOL;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+ .FAILED_TO_LOAD_NODEPOOL;
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB;
+
+/**
+ * SCM node pool manager that manges node pools.
+ */
+public final class SCMNodePoolManager implements NodePoolManager {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMNodePoolManager.class);
+ private static final List EMPTY_NODE_LIST =
+ new ArrayList<>();
+ private static final List EMPTY_NODEPOOL_LIST = new ArrayList<>();
+ public static final String DEFAULT_NODEPOOL = "DefaultNodePool";
+
+ // DB that saves the node to node pool mapping.
+ private MetadataStore nodePoolStore;
+
+ // In-memory node pool to nodes mapping
+ private HashMap> nodePools;
+
+ // Read-write lock for nodepool operations
+ private ReadWriteLock lock;
+
+ /**
+ * Construct SCMNodePoolManager class that manages node to node pool mapping.
+ * @param conf - configuration.
+ * @throws IOException
+ */
+ public SCMNodePoolManager(final OzoneConfiguration conf)
+ throws IOException {
+ final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
+ OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+ File metaDir = getOzoneMetaDirPath(conf);
+ String scmMetaDataDir = metaDir.getPath();
+ File nodePoolDBPath = new File(scmMetaDataDir, NODEPOOL_DB);
+ nodePoolStore = MetadataStoreBuilder.newBuilder()
+ .setConf(conf)
+ .setDbFile(nodePoolDBPath)
+ .setCacheSize(cacheSize * OzoneConsts.MB)
+ .build();
+ nodePools = new HashMap<>();
+ lock = new ReentrantReadWriteLock();
+ init();
+ }
+
+ /**
+ * Initialize the in-memory store based on persist store from level db.
+ * No lock is needed as init() is only invoked by constructor.
+ * @throws SCMException
+ */
+ private void init() throws SCMException {
+ try {
+ nodePoolStore.iterate(null, (key, value) -> {
+ try {
+ DatanodeDetails nodeId = DatanodeDetails.getFromProtoBuf(
+ HddsProtos.DatanodeDetailsProto.PARSER.parseFrom(key));
+ String poolName = DFSUtil.bytes2String(value);
+
+ Set nodePool = null;
+ if (nodePools.containsKey(poolName)) {
+ nodePool = nodePools.get(poolName);
+ } else {
+ nodePool = new HashSet<>();
+ nodePools.put(poolName, nodePool);
+ }
+ nodePool.add(nodeId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding node: {} to node pool: {}",
+ nodeId, poolName);
+ }
+ } catch (IOException e) {
+ LOG.warn("Can't add a datanode to node pool, continue next...");
+ }
+ return true;
+ });
+ } catch (IOException e) {
+ LOG.error("Loading node pool error " + e);
+ throw new SCMException("Failed to load node pool",
+ FAILED_TO_LOAD_NODEPOOL);
+ }
+ }
+
+ /**
+ * Add a datanode to a node pool.
+ * @param pool - name of the node pool.
+ * @param node - name of the datanode.
+ */
+ @Override
+ public void addNode(final String pool, final DatanodeDetails node)
+ throws IOException {
+ Preconditions.checkNotNull(pool, "pool name is null");
+ Preconditions.checkNotNull(node, "node is null");
+ lock.writeLock().lock();
+ try {
+ // add to the persistent store
+ nodePoolStore.put(node.getProtoBufMessage().toByteArray(),
+ DFSUtil.string2Bytes(pool));
+
+ // add to the in-memory store
+ Set nodePool = null;
+ if (nodePools.containsKey(pool)) {
+ nodePool = nodePools.get(pool);
+ } else {
+ nodePool = new HashSet();
+ nodePools.put(pool, nodePool);
+ }
+ nodePool.add(node);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Remove a datanode from a node pool.
+ * @param pool - name of the node pool.
+ * @param node - datanode id.
+ * @throws SCMException
+ */
+ @Override
+ public void removeNode(final String pool, final DatanodeDetails node)
+ throws SCMException {
+ Preconditions.checkNotNull(pool, "pool name is null");
+ Preconditions.checkNotNull(node, "node is null");
+ lock.writeLock().lock();
+ try {
+ // Remove from the persistent store
+ byte[] kName = node.getProtoBufMessage().toByteArray();
+ byte[] kData = nodePoolStore.get(kName);
+ if (kData == null) {
+ throw new SCMException(String.format("Unable to find node %s from" +
+ " pool %s in DB.", DFSUtil.bytes2String(kName), pool),
+ FAILED_TO_FIND_NODE_IN_POOL);
+ }
+ nodePoolStore.delete(kName);
+
+ // Remove from the in-memory store
+ if (nodePools.containsKey(pool)) {
+ Set nodePool = nodePools.get(pool);
+ nodePool.remove(node);
+ } else {
+ throw new SCMException(String.format("Unable to find node %s from" +
+ " pool %s in MAP.", DFSUtil.bytes2String(kName), pool),
+ FAILED_TO_FIND_NODE_IN_POOL);
+ }
+ } catch (IOException e) {
+ throw new SCMException("Failed to remove node " + node.toString()
+ + " from node pool " + pool, e,
+ SCMException.ResultCodes.IO_EXCEPTION);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Get all the node pools.
+ * @return all the node pools.
+ */
+ @Override
+ public List getNodePools() {
+ lock.readLock().lock();
+ try {
+ if (!nodePools.isEmpty()) {
+ return nodePools.keySet().stream().collect(Collectors.toList());
+ } else {
+ return EMPTY_NODEPOOL_LIST;
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Get all datanodes of a specific node pool.
+ * @param pool - name of the node pool.
+ * @return all datanodes of the specified node pool.
+ */
+ @Override
+ public List getNodes(final String pool) {
+ Preconditions.checkNotNull(pool, "pool name is null");
+ if (nodePools.containsKey(pool)) {
+ return nodePools.get(pool).stream().collect(Collectors.toList());
+ } else {
+ return EMPTY_NODE_LIST;
+ }
+ }
+
+ /**
+ * Get the node pool name if the node has been added to a node pool.
+ * @param datanodeDetails - datanode ID.
+ * @return node pool name if it has been assigned.
+ * null if the node has not been assigned to any node pool yet.
+ * TODO: Put this in a in-memory map if performance is an issue.
+ */
+ @Override
+ public String getNodePool(final DatanodeDetails datanodeDetails)
+ throws SCMException {
+ Preconditions.checkNotNull(datanodeDetails, "node is null");
+ try {
+ byte[] result = nodePoolStore.get(
+ datanodeDetails.getProtoBufMessage().toByteArray());
+ return result == null ? null : DFSUtil.bytes2String(result);
+ } catch (IOException e) {
+ throw new SCMException("Failed to get node pool for node "
+ + datanodeDetails.toString(), e,
+ SCMException.ResultCodes.IO_EXCEPTION);
+ }
+ }
+
+ /**
+ * Close node pool level db store.
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ nodePoolStore.close();
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 80b5d6e182c..8c59462b407 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodePoolManager;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -272,6 +273,11 @@ public class MockNodeManager implements NodeManager {
return new SCMNodeMetric(nodeMetricMap.get(datanodeDetails.getUuid()));
}
+ @Override
+ public NodePoolManager getNodePoolManager() {
+ return Mockito.mock(NodePoolManager.class);
+ }
+
/**
* Used for testing.
*
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java
new file mode 100644
index 00000000000..8f412deddad
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.commons.collections.ListUtils;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+ .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+ .SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.test.PathUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for SCM node pool manager.
+ */
+public class TestSCMNodePoolManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestSCMNodePoolManager.class);
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private final File testDir = PathUtils.getTestDir(
+ TestSCMNodePoolManager.class);
+
+ SCMNodePoolManager createNodePoolManager(OzoneConfiguration conf)
+ throws IOException {
+ conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS,
+ testDir.getAbsolutePath());
+ conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+ SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+ return new SCMNodePoolManager(conf);
+ }
+
+ /**
+ * Test default node pool.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testDefaultNodePool() throws IOException {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ try {
+ final String defaultPool = "DefaultPool";
+ NodePoolManager npMgr = createNodePoolManager(conf);
+
+ final int nodeCount = 4;
+ final List nodes = TestUtils
+ .getListOfDatanodeDetails(nodeCount);
+ assertEquals(0, npMgr.getNodePools().size());
+ for (DatanodeDetails node: nodes) {
+ npMgr.addNode(defaultPool, node);
+ }
+ List nodesRetrieved = npMgr.getNodes(defaultPool);
+ assertEquals(nodeCount, nodesRetrieved.size());
+ assertTwoDatanodeListsEqual(nodes, nodesRetrieved);
+
+ DatanodeDetails nodeRemoved = nodes.remove(2);
+ npMgr.removeNode(defaultPool, nodeRemoved);
+ List nodesAfterRemove = npMgr.getNodes(defaultPool);
+ assertTwoDatanodeListsEqual(nodes, nodesAfterRemove);
+
+ List nonExistSet = npMgr.getNodes("NonExistSet");
+ assertEquals(0, nonExistSet.size());
+ } finally {
+ FileUtil.fullyDelete(testDir);
+ }
+ }
+
+
+ /**
+ * Test default node pool reload.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testDefaultNodePoolReload() throws IOException {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ final String defaultPool = "DefaultPool";
+ final int nodeCount = 4;
+ final List nodes = TestUtils
+ .getListOfDatanodeDetails(nodeCount);
+
+ try {
+ try {
+ SCMNodePoolManager npMgr = createNodePoolManager(conf);
+ assertEquals(0, npMgr.getNodePools().size());
+ for (DatanodeDetails node : nodes) {
+ npMgr.addNode(defaultPool, node);
+ }
+ List nodesRetrieved = npMgr.getNodes(defaultPool);
+ assertEquals(nodeCount, nodesRetrieved.size());
+ assertTwoDatanodeListsEqual(nodes, nodesRetrieved);
+ npMgr.close();
+ } finally {
+ LOG.info("testDefaultNodePoolReload: Finish adding nodes to pool" +
+ " and close.");
+ }
+
+ // try reload with a new NodePoolManager instance
+ try {
+ SCMNodePoolManager npMgr = createNodePoolManager(conf);
+ List nodesRetrieved = npMgr.getNodes(defaultPool);
+ assertEquals(nodeCount, nodesRetrieved.size());
+ assertTwoDatanodeListsEqual(nodes, nodesRetrieved);
+ } finally {
+ LOG.info("testDefaultNodePoolReload: Finish reloading node pool.");
+ }
+ } finally {
+ FileUtil.fullyDelete(testDir);
+ }
+ }
+
+ /**
+ * Compare and verify that two datanode lists are equal.
+ * @param list1 - datanode list 1.
+ * @param list2 - datanode list 2.
+ */
+ private void assertTwoDatanodeListsEqual(List list1,
+ List list2) {
+ assertEquals(list1.size(), list2.size());
+ Collections.sort(list1);
+ Collections.sort(list2);
+ assertTrue(ListUtils.isEqualList(list1, list2));
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 1a4dcd7ad20..072d8212470 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.node.CommandQueue;
import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodePoolManager;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
@@ -200,6 +201,10 @@ public class ReplicationNodeManagerMock implements NodeManager {
return null;
}
+ @Override
+ public NodePoolManager getNodePoolManager() {
+ return Mockito.mock(NodePoolManager.class);
+ }
/**
* Wait for the heartbeat is processed by NodeManager.
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java
new file mode 100644
index 00000000000..ffcd752e84a
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.testutils;
+
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodePoolManager;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Pool Manager replication mock.
+ */
+public class ReplicationNodePoolManagerMock implements NodePoolManager {
+
+ private final Map nodeMemberShip;
+
+ /**
+ * A node pool manager for testing.
+ */
+ public ReplicationNodePoolManagerMock() {
+ nodeMemberShip = new HashMap<>();
+ }
+
+ /**
+ * Add a node to a node pool.
+ *
+ * @param pool - name of the node pool.
+ * @param node - data node.
+ */
+ @Override
+ public void addNode(String pool, DatanodeDetails node) {
+ nodeMemberShip.put(node, pool);
+ }
+
+ /**
+ * Remove a node from a node pool.
+ *
+ * @param pool - name of the node pool.
+ * @param node - data node.
+ * @throws SCMException
+ */
+ @Override
+ public void removeNode(String pool, DatanodeDetails node)
+ throws SCMException {
+ nodeMemberShip.remove(node);
+
+ }
+
+ /**
+ * Get a list of known node pools.
+ *
+ * @return a list of known node pool names or an empty list if not node pool
+ * is defined.
+ */
+ @Override
+ public List getNodePools() {
+ Set poolSet = new HashSet<>();
+ for (Map.Entry entry : nodeMemberShip.entrySet()) {
+ poolSet.add(entry.getValue());
+ }
+ return new ArrayList<>(poolSet);
+
+ }
+
+ /**
+ * Get all nodes of a node pool given the name of the node pool.
+ *
+ * @param pool - name of the node pool.
+ * @return a list of datanode ids or an empty list if the node pool was not
+ * found.
+ */
+ @Override
+ public List getNodes(String pool) {
+ Set datanodeSet = new HashSet<>();
+ for (Map.Entry entry : nodeMemberShip.entrySet()) {
+ if (entry.getValue().equals(pool)) {
+ datanodeSet.add(entry.getKey());
+ }
+ }
+ return new ArrayList<>(datanodeSet);
+ }
+
+ /**
+ * Get the node pool name if the node has been added to a node pool.
+ *
+ * @param datanodeDetails DatanodeDetails.
+ * @return node pool name if it has been assigned. null if the node has not
+ * been assigned to any node pool yet.
+ */
+ @Override
+ public String getNodePool(DatanodeDetails datanodeDetails) {
+ return nodeMemberShip.get(datanodeDetails);
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ *
As noted in {@link AutoCloseable#close()}, cases where the
+ * close may fail require careful attention. It is strongly advised
+ * to relinquish the underlying resources and to internally
+ * mark the {@code Closeable} as closed, prior to throwing
+ * the {@code IOException}.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
index b4ed2b12c2f..4d70af84a21 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
@@ -51,9 +51,12 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.UUID;
+import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB;
import static org.apache.hadoop.ozone.OzoneConsts.KB;
+import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* This class tests the CLI that transforms container into SQLite DB files.
@@ -173,6 +176,34 @@ public class TestContainerSQLCli {
}
}
+ @Test
+ public void testConvertNodepoolDB() throws Exception {
+ String dbOutPath = GenericTestUtils.getTempPath(
+ UUID.randomUUID() + "/out_sql.db");
+ String dbRootPath = conf.get(OzoneConfigKeys.OZONE_METADATA_DIRS);
+ String dbPath = dbRootPath + "/" + NODEPOOL_DB;
+ String[] args = {"-p", dbPath, "-o", dbOutPath};
+
+ cli.run(args);
+
+ // verify the sqlite db
+ HashMap expectedPool = new HashMap<>();
+ for (DatanodeDetails dnid : nodeManager.getAllNodes()) {
+ expectedPool.put(dnid.getUuidString(), "DefaultNodePool");
+ }
+ Connection conn = connectDB(dbOutPath);
+ String sql = "SELECT * FROM nodePool";
+ ResultSet rs = executeQuery(conn, sql);
+ while(rs.next()) {
+ String datanodeUUID = rs.getString("datanodeUUID");
+ String poolName = rs.getString("poolName");
+ assertTrue(expectedPool.remove(datanodeUUID).equals(poolName));
+ }
+ assertEquals(0, expectedPool.size());
+
+ Files.delete(Paths.get(dbOutPath));
+ }
+
@Test
public void testConvertContainerDB() throws Exception {
String dbOutPath = GenericTestUtils.getTempPath(
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
index edc0d7b597b..2bd43fb93af 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.scm.cli;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
+import com.google.protobuf.ByteString;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
@@ -59,11 +60,13 @@ import java.sql.Statement;
import java.util.HashSet;
import java.util.Set;
+import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_SUFFIX;
import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
import static org.apache.hadoop.ozone.OzoneConsts.KSM_USER_PREFIX;
import static org.apache.hadoop.ozone.OzoneConsts.KSM_BUCKET_PREFIX;
import static org.apache.hadoop.ozone.OzoneConsts.KSM_VOLUME_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB;
import static org.apache.hadoop.ozone.OzoneConsts.OPEN_CONTAINERS_DB;
/**
@@ -108,6 +111,15 @@ public class SQLCLI extends Configured implements Tool {
private static final String INSERT_CONTAINER_MEMBERS =
"INSERT INTO containerMembers (containerName, datanodeUUID) " +
"VALUES (\"%s\", \"%s\")";
+ // for nodepool.db
+ private static final String CREATE_NODE_POOL =
+ "CREATE TABLE nodePool (" +
+ "datanodeUUID TEXT NOT NULL," +
+ "poolName TEXT NOT NULL," +
+ "PRIMARY KEY(datanodeUUID, poolName))";
+ private static final String INSERT_NODE_POOL =
+ "INSERT INTO nodePool (datanodeUUID, poolName) " +
+ "VALUES (\"%s\", \"%s\")";
// and reuse CREATE_DATANODE_INFO and INSERT_DATANODE_INFO
// for openContainer.db
private static final String CREATE_OPEN_CONTAINER =
@@ -273,6 +285,9 @@ public class SQLCLI extends Configured implements Tool {
if (dbName.toString().endsWith(CONTAINER_DB_SUFFIX)) {
LOG.info("Converting container DB");
convertContainerDB(dbPath, outPath);
+ } else if (dbName.toString().equals(NODEPOOL_DB)) {
+ LOG.info("Converting node pool DB");
+ convertNodePoolDB(dbPath, outPath);
} else if (dbName.toString().equals(OPEN_CONTAINERS_DB)) {
LOG.info("Converting open container DB");
convertOpenContainerDB(dbPath, outPath);
@@ -528,7 +543,66 @@ public class SQLCLI extends Configured implements Tool {
}
LOG.info("Insertion completed.");
}
+ /**
+ * Converts nodePool.db to sqlite. The schema of sql db:
+ * two tables, nodePool and datanodeInfo (the same datanode Info as for
+ * container.db).
+ *
+ * nodePool
+ * ---------------------------------------------------------
+ * datanodeUUID* | poolName*
+ * ---------------------------------------------------------
+ *
+ * datanodeInfo:
+ * ---------------------------------------------------------
+ * hostname | datanodeUUid* | xferPort | ipcPort
+ * ---------------------------------------------------------
+ *
+ * --------------------------------
+ * |containerPort
+ * --------------------------------
+ *
+ * @param dbPath path to container db.
+ * @param outPath path to output sqlite
+ * @throws IOException throws exception.
+ */
+ private void convertNodePoolDB(Path dbPath, Path outPath) throws Exception {
+ LOG.info("Create table for sql node pool db.");
+ File dbFile = dbPath.toFile();
+ try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
+ .setConf(conf).setDbFile(dbFile).build();
+ Connection conn = connectDB(outPath.toString())) {
+ executeSQL(conn, CREATE_NODE_POOL);
+ executeSQL(conn, CREATE_DATANODE_INFO);
+ dbStore.iterate(null, (key, value) -> {
+ DatanodeDetails nodeId = DatanodeDetails
+ .getFromProtoBuf(HddsProtos.DatanodeDetailsProto
+ .PARSER.parseFrom(key));
+ String blockPool = DFSUtil.bytes2String(value);
+ try {
+ insertNodePoolDB(conn, blockPool, nodeId);
+ return true;
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ });
+ }
+ }
+
+ private void insertNodePoolDB(Connection conn, String blockPool,
+ DatanodeDetails datanodeDetails) throws SQLException {
+ String insertNodePool = String.format(INSERT_NODE_POOL,
+ datanodeDetails.getUuidString(), blockPool);
+ executeSQL(conn, insertNodePool);
+
+ String insertDatanodeDetails = String
+ .format(INSERT_DATANODE_INFO, datanodeDetails.getHostName(),
+ datanodeDetails.getUuidString(), datanodeDetails.getIpAddress(),
+ datanodeDetails.getPort(DatanodeDetails.Port.Name.STANDALONE)
+ .getValue());
+ executeSQL(conn, insertDatanodeDetails);
+ }
/**
* Convert openContainer.db to sqlite db file. This is rather simple db,