diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java index 6f5c8737c43..fbe96373191 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java @@ -223,6 +223,12 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT = "60s"; + /** + * This determines the total number of pools to be processed in parallel. + */ + public static final String OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS = + "ozone.scm.max.nodepool.processing.threads"; + public static final int OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT = 1; /** * These 2 settings control the number of threads in executor pool and time * outs for thw container reports from all nodes. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index 3276db8d4a0..4245fe09ff8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -984,9 +984,7 @@ public ContainerReportsResponseProto sendContainerReport( updateContainerReportMetrics(reports); // should we process container reports async? - scmContainerManager.processContainerReports( - DatanodeID.getFromProtoBuf(reports.getDatanodeID()), - reports.getType(), reports.getReportsList()); + scmContainerManager.processContainerReports(reports); return ContainerReportsResponseProto.newBuilder().build(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java index fe86064920c..8a82c821628 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java @@ -20,7 +20,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.lease.Lease; import org.apache.hadoop.ozone.lease.LeaseException; @@ -29,7 +28,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.scm.container.replication.ContainerSupervisor; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector; @@ -74,6 +75,7 @@ public class ContainerMapping implements Mapping { private final PipelineSelector pipelineSelector; private final ContainerStateManager containerStateManager; private final LeaseManager containerLeaseManager; + private final ContainerSupervisor containerSupervisor; private final float containerCloseThreshold; /** @@ -113,6 +115,9 @@ public ContainerMapping( this.pipelineSelector = new PipelineSelector(nodeManager, conf); this.containerStateManager = new ContainerStateManager(conf, this); + this.containerSupervisor = + new ContainerSupervisor(conf, nodeManager, + nodeManager.getNodePoolManager()); this.containerCloseThreshold = conf.getFloat( ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD, ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT); @@ -347,16 +352,14 @@ public ContainerStateManager getStateManager() { /** * Process container report from Datanode. * - * @param datanodeID Datanode ID - * @param reportType Type of report - * @param containerInfos container details + * @param reports Container report */ @Override - public void processContainerReports( - DatanodeID datanodeID, - ContainerReportsRequestProto.reportType reportType, - List - containerInfos) throws IOException { + public void processContainerReports(ContainerReportsRequestProto reports) + throws IOException { + List + containerInfos = reports.getReportsList(); + containerSupervisor.handleContainerReport(reports); for (StorageContainerDatanodeProtocolProtos.ContainerInfo containerInfo : containerInfos) { byte[] dbKey = containerInfo.getContainerNameBytes().toByteArray(); @@ -395,7 +398,7 @@ public void processContainerReports( // TODO: Handling of containers which are already in close queue. if (containerUsedPercentage >= containerCloseThreshold) { // TODO: The container has to be moved to close container queue. - // For now, we are just updating the container state to CLOSED. + // For now, we are just updating the container state to CLOSING. // Close container implementation can decide on how to maintain // list of containers to be closed, this is the place where we // have to add the containers to that list. @@ -412,7 +415,7 @@ public void processContainerReports( // Container not found in our container db. LOG.error("Error while processing container report from datanode :" + " {}, for container: {}, reason: container doesn't exist in" + - "container database.", datanodeID, + "container database.", reports.getDatanodeID(), containerInfo.getContainerName()); } } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java index 577571f0fe3..0d442d133fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java @@ -17,10 +17,7 @@ package org.apache.hadoop.ozone.scm.container; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; @@ -102,14 +99,9 @@ OzoneProtos.LifeCycleState updateContainerState(String containerName, /** * Process container report from Datanode. * - * @param datanodeID Datanode ID - * @param reportType Type of report - * @param containerInfos container details + * @param reports Container report */ - void processContainerReports( - DatanodeID datanodeID, - ContainerReportsRequestProto.reportType reportType, - List - containerInfos) throws IOException; + void processContainerReports(ContainerReportsRequestProto reports) + throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java similarity index 73% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java index f9b86f5b2ff..c063b8bc778 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java @@ -19,12 +19,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.conf.OzoneConfiguration; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.ozone.scm.exceptions.SCMException; -import org.apache.hadoop.ozone.scm.node.CommandQueue; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.node.NodePoolManager; import org.apache.hadoop.util.Time; @@ -43,6 +42,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static com.google.common.util.concurrent.Uninterruptibles .sleepUninterruptibly; @@ -58,17 +59,20 @@ .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS; import static org.apache.hadoop.scm.ScmConfigKeys .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT; /** * This class takes a set of container reports that belong to a pool and then * computes the replication levels for each container. */ -public class ContainerReplicationManager implements Closeable { +public class ContainerSupervisor implements Closeable { public static final Logger LOG = - LoggerFactory.getLogger(ContainerReplicationManager.class); + LoggerFactory.getLogger(ContainerSupervisor.class); private final NodePoolManager poolManager; - private final CommandQueue commandQueue; private final HashSet poolNames; private final PriorityQueue poolQueue; private final NodeManager nodeManager; @@ -79,6 +83,9 @@ public class ContainerReplicationManager implements Closeable { private long poolProcessCount; private final List inProgressPoolList; private final AtomicInteger threadFaultCount; + private final int inProgressPoolMaxCount; + + private final ReadWriteLock inProgressPoolListLock; /** * Returns the number of times we have processed pools. @@ -95,13 +102,10 @@ public long getPoolProcessCount() { * @param conf - OzoneConfiguration * @param nodeManager - Node Manager * @param poolManager - Pool Manager - * @param commandQueue - Datanodes Command Queue. */ - public ContainerReplicationManager(OzoneConfiguration conf, - NodeManager nodeManager, NodePoolManager poolManager, - CommandQueue commandQueue) { + public ContainerSupervisor(Configuration conf, NodeManager nodeManager, + NodePoolManager poolManager) { Preconditions.checkNotNull(poolManager); - Preconditions.checkNotNull(commandQueue); Preconditions.checkNotNull(nodeManager); this.containerProcessingLag = conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL, @@ -116,18 +120,21 @@ public ContainerReplicationManager(OzoneConfiguration conf, conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); + this.inProgressPoolMaxCount = conf.getInt( + OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS, + OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT); this.poolManager = poolManager; - this.commandQueue = commandQueue; this.nodeManager = nodeManager; this.poolNames = new HashSet<>(); this.poolQueue = new PriorityQueue<>(); - runnable = new AtomicBoolean(true); + this.runnable = new AtomicBoolean(true); this.threadFaultCount = new AtomicInteger(0); - executorService = HadoopExecutors.newCachedThreadPool( + this.executorService = HadoopExecutors.newCachedThreadPool( new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Container Reports Processing Thread - %d") .build(), maxContainerReportThreads); - inProgressPoolList = new LinkedList<>(); + this.inProgressPoolList = new LinkedList<>(); + this.inProgressPoolListLock = new ReentrantReadWriteLock(); initPoolProcessThread(); } @@ -211,31 +218,49 @@ private void initPoolProcessThread() { while (runnable.get()) { // Make sure that we don't have any new pools. refreshPools(); - PeriodicPool pool = poolQueue.poll(); - if (pool != null) { - if (pool.getLastProcessedTime() + this.containerProcessingLag < - Time.monotonicNow()) { - LOG.debug("Adding pool {} to container processing queue", pool - .getPoolName()); - InProgressPool inProgressPool = new InProgressPool(maxPoolWait, - pool, this.nodeManager, this.poolManager, this.commandQueue, - this.executorService); - inProgressPool.startReconciliation(); - inProgressPoolList.add(inProgressPool); - poolProcessCount++; - - } else { - - LOG.debug("Not within the time window for processing: {}", + while (inProgressPoolList.size() < inProgressPoolMaxCount) { + PeriodicPool pool = poolQueue.poll(); + if (pool != null) { + if (pool.getLastProcessedTime() + this.containerProcessingLag > + Time.monotonicNow()) { + LOG.debug("Not within the time window for processing: {}", + pool.getPoolName()); + // we might over sleep here, not a big deal. + sleepUninterruptibly(this.containerProcessingLag, + TimeUnit.MILLISECONDS); + } + LOG.debug("Adding pool {} to container processing queue", pool.getPoolName()); - // Put back this pool since we are not planning to process it. - poolQueue.add(pool); - // we might over sleep here, not a big deal. - sleepUninterruptibly(this.containerProcessingLag, - TimeUnit.MILLISECONDS); + InProgressPool inProgressPool = new InProgressPool(maxPoolWait, + pool, this.nodeManager, this.poolManager, this.executorService); + inProgressPool.startReconciliation(); + inProgressPoolListLock.writeLock().lock(); + try { + inProgressPoolList.add(inProgressPool); + } finally { + inProgressPoolListLock.writeLock().unlock(); + } + poolProcessCount++; + } else { + break; } } sleepUninterruptibly(this.maxPoolWait, TimeUnit.MILLISECONDS); + inProgressPoolListLock.readLock().lock(); + try { + for (InProgressPool inProgressPool : inProgressPoolList) { + inProgressPool.finalizeReconciliation(); + poolQueue.add(inProgressPool.getPool()); + } + } finally { + inProgressPoolListLock.readLock().unlock(); + } + inProgressPoolListLock.writeLock().lock(); + try { + inProgressPoolList.clear(); + } finally { + inProgressPoolListLock.writeLock().unlock(); + } } }; @@ -263,28 +288,28 @@ private void initPoolProcessThread() { */ public void handleContainerReport( ContainerReportsRequestProto containerReport) { - String poolName = null; - DatanodeID datanodeID = DatanodeID - .getFromProtoBuf(containerReport.getDatanodeID()); + DatanodeID datanodeID = DatanodeID.getFromProtoBuf( + containerReport.getDatanodeID()); + inProgressPoolListLock.readLock().lock(); try { - poolName = poolManager.getNodePool(datanodeID); + String poolName = poolManager.getNodePool(datanodeID); + for (InProgressPool ppool : inProgressPoolList) { + if (ppool.getPoolName().equalsIgnoreCase(poolName)) { + ppool.handleContainerReport(containerReport); + return; + } + } + // TODO: Decide if we can do anything else with this report. + LOG.debug("Discarding the container report for pool {}. " + + "That pool is not currently in the pool reconciliation process." + + " Container Name: {}", poolName, containerReport.getDatanodeID()); } catch (SCMException e) { LOG.warn("Skipping processing container report from datanode {}, " + "cause: failed to get the corresponding node pool", datanodeID.toString(), e); - return; + } finally { + inProgressPoolListLock.readLock().unlock(); } - - for(InProgressPool ppool : inProgressPoolList) { - if(ppool.getPoolName().equalsIgnoreCase(poolName)) { - ppool.handleContainerReport(containerReport); - return; - } - } - // TODO: Decide if we can do anything else with this report. - LOG.debug("Discarding the container report for pool {}. That pool is not " + - "currently in the pool reconciliation process. Container Name: {}", - poolName, containerReport.getDatanodeID()); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java index 24423a37b02..833d1a8bfba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java @@ -21,9 +21,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.ozone.scm.node.CommandQueue; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerInfo; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.node.NodePoolManager; import org.apache.hadoop.util.Time; @@ -39,10 +40,14 @@ import java.util.function.Predicate; import java.util.stream.Collectors; -import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; -import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY; -import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.STALE; -import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.UNKNOWN; +import static com.google.common.util.concurrent.Uninterruptibles + .sleepUninterruptibly; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos + .NodeState.HEALTHY; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos + .NodeState.STALE; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos + .NodeState.UNKNOWN; /** * These are pools that are actively checking for replication status of the @@ -51,8 +56,8 @@ public final class InProgressPool { public static final Logger LOG = LoggerFactory.getLogger(InProgressPool.class); + private final PeriodicPool pool; - private final CommandQueue commandQueue; private final NodeManager nodeManager; private final NodePoolManager poolManager; private final ExecutorService executorService; @@ -70,22 +75,19 @@ public final class InProgressPool { * @param pool - Pool that we are working against * @param nodeManager - Nodemanager * @param poolManager - pool manager - * @param commandQueue - Command queue * @param executorService - Shared Executor service. */ InProgressPool(long maxWaitTime, PeriodicPool pool, NodeManager nodeManager, NodePoolManager poolManager, - CommandQueue commandQueue, ExecutorService executorService) { + ExecutorService executorService) { Preconditions.checkNotNull(pool); Preconditions.checkNotNull(nodeManager); Preconditions.checkNotNull(poolManager); - Preconditions.checkNotNull(commandQueue); Preconditions.checkNotNull(executorService); Preconditions.checkArgument(maxWaitTime > 0); this.pool = pool; this.nodeManager = nodeManager; this.poolManager = poolManager; - this.commandQueue = commandQueue; this.executorService = executorService; this.containerCountMap = new ConcurrentHashMap<>(); this.processedNodeSet = new ConcurrentHashMap<>(); @@ -186,7 +188,7 @@ public void startReconciliation() { // Queue commands to all datanodes in this pool to send us container // report. Since we ignore dead nodes, it is possible that we would have // over replicated the container if the node comes back. - commandQueue.addCommand(id, cmd); + nodeManager.addDatanodeCommand(id, cmd); } } this.status = ProgressStatus.InProgress; @@ -235,7 +237,12 @@ private NodeState getNodestate(DatanodeID id) { */ public void handleContainerReport( ContainerReportsRequestProto containerReport) { - executorService.submit(processContainerReport(containerReport)); + if (status == ProgressStatus.InProgress) { + executorService.submit(processContainerReport(containerReport)); + } else { + LOG.debug("Cannot handle container report when the pool is in {} status.", + status); + } } private Runnable processContainerReport( @@ -292,6 +299,11 @@ String getPoolName() { return pool.getPoolName(); } + public void finalizeReconciliation() { + status = ProgressStatus.Done; + //TODO: Add finalizing logic. This is where actual reconciliation happens. + } + /** * Current status of the computing replication status. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java index 266428bccc7..ce032b41a0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java @@ -122,6 +122,12 @@ public interface NodeManager extends StorageContainerNodeProtocol, */ SCMNodeMetric getNodeStat(DatanodeID datanodeID); + /** + * Returns the NodePoolManager associated with the NodeManager. + * @return NodePoolManager + */ + NodePoolManager getNodePoolManager(); + /** * Wait for the heartbeat is processed by NodeManager. * @return true if heartbeat has been processed. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java index ed894cbb571..129e65d46e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java @@ -857,6 +857,11 @@ public SCMNodeMetric getNodeStat(DatanodeID datanodeID) { return new SCMNodeMetric(nodeStats.get(datanodeID.getDatanodeUuid())); } + @Override + public NodePoolManager getNodePoolManager() { + return nodePoolManager; + } + @Override public Map getNodeCount() { Map nodeCountMap = new HashMap(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java index 94f2a17fbe4..d9bf5878ae8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java @@ -29,26 +29,32 @@ .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.ozone.scm.node.CommandQueue; import org.apache.hadoop.ozone.scm.node.NodeManager; import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; +import org.apache.hadoop.ozone.scm.node.NodePoolManager; +import org.mockito.Mockito; /** * A Node Manager to test replication. */ public class ReplicationNodeManagerMock implements NodeManager { private final Map nodeStateMap; + private final CommandQueue commandQueue; /** * A list of Datanodes and current states. * @param nodeState A node state map. */ - public ReplicationNodeManagerMock(Map nodeState) { + public ReplicationNodeManagerMock(Map nodeState, + CommandQueue commandQueue) { Preconditions.checkNotNull(nodeState); - nodeStateMap = nodeState; + this.nodeStateMap = nodeState; + this.commandQueue = commandQueue; } /** @@ -194,6 +200,11 @@ public SCMNodeMetric getNodeStat(DatanodeID datanodeID) { return null; } + @Override + public NodePoolManager getNodePoolManager() { + return Mockito.mock(NodePoolManager.class); + } + /** * Wait for the heartbeat is processed by NodeManager. * @@ -304,4 +315,9 @@ public void addNode(DatanodeID id, NodeState state) { nodeStateMap.put(id, state); } + @Override + public void addDatanodeCommand(DatanodeID id, SCMCommand command) { + this.commandQueue.addCommand(id, command); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java similarity index 88% rename from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java rename to hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java index 0e363393877..89275965532 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java @@ -28,8 +28,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.ozone.scm.container.replication - .ContainerReplicationManager; +import org.apache.hadoop.ozone.scm.container.replication.ContainerSupervisor; import org.apache.hadoop.ozone.scm.container.replication.InProgressPool; import org.apache.hadoop.ozone.scm.node.CommandQueue; import org.apache.hadoop.ozone.scm.node.NodeManager; @@ -53,35 +52,37 @@ import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY; import static org.apache.hadoop.scm.ScmConfigKeys .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL; import static org.apache.ratis.shaded.com.google.common.util.concurrent .Uninterruptibles.sleepUninterruptibly; /** * Tests for the container manager. */ -public class TestContainerReplicationManager { +public class TestContainerSupervisor { final static String POOL_NAME_TEMPLATE = "Pool%d"; static final int MAX_DATANODES = 72; static final int POOL_SIZE = 24; static final int POOL_COUNT = 3; private LogCapturer logCapturer = LogCapturer.captureLogs( - LogFactory.getLog(ContainerReplicationManager.class)); + LogFactory.getLog(ContainerSupervisor.class)); private List datanodes = new LinkedList<>(); private NodeManager nodeManager; private NodePoolManager poolManager; private CommandQueue commandQueue; - private ContainerReplicationManager replicationManager; + private ContainerSupervisor containerSupervisor; private ReplicationDatanodeStateManager datanodeStateManager; @After public void tearDown() throws Exception { logCapturer.stopCapturing(); - GenericTestUtils.setLogLevel(ContainerReplicationManager.LOG, Level.INFO); + GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.INFO); } @Before public void setUp() throws Exception { - GenericTestUtils.setLogLevel(ContainerReplicationManager.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.DEBUG); Map nodeStateMap = new HashMap<>(); // We are setting up 3 pools with 24 nodes each in this cluster. // First we create 72 Datanodes. @@ -91,11 +92,13 @@ public void setUp() throws Exception { nodeStateMap.put(datanode, HEALTHY); } - // All nodes in this cluster are healthy for time being. - nodeManager = new ReplicationNodeManagerMock(nodeStateMap); - poolManager = new ReplicationNodePoolManagerMock(); commandQueue = new CommandQueue(); + // All nodes in this cluster are healthy for time being. + nodeManager = new ReplicationNodeManagerMock(nodeStateMap, commandQueue); + poolManager = new ReplicationNodePoolManagerMock(); + + Assert.assertEquals("Max datanodes should be equal to POOL_SIZE * " + "POOL_COUNT", POOL_COUNT * POOL_SIZE, MAX_DATANODES); @@ -108,10 +111,12 @@ public void setUp() throws Exception { } } OzoneConfiguration config = SCMTestUtils.getOzoneConf(); - config.setTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, 1, + config.setTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, 2, TimeUnit.SECONDS); - replicationManager = new ContainerReplicationManager(config, - nodeManager, poolManager, commandQueue); + config.setTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL, 1, + TimeUnit.SECONDS); + containerSupervisor = new ContainerSupervisor(config, + nodeManager, poolManager); datanodeStateManager = new ReplicationDatanodeStateManager(nodeManager, poolManager); // Sleep for one second to make sure all threads get time to run. @@ -125,13 +130,13 @@ public void setUp() throws Exception { public void testAssertPoolsAreProcessed() { // This asserts that replication manager has started processing at least // one pool. - Assert.assertTrue(replicationManager.getInProgressPoolCount() > 0); + Assert.assertTrue(containerSupervisor.getInProgressPoolCount() > 0); // Since all datanodes are flagged as healthy in this test, for each // datanode we must have queued a command. - Assert.assertEquals("Commands are in queue :", commandQueue - .getCommandsInQueue(), POOL_SIZE * replicationManager - .getInProgressPoolCount()); + Assert.assertEquals("Commands are in queue :", + POOL_SIZE * containerSupervisor.getInProgressPoolCount(), + commandQueue.getCommandsInQueue()); } @Test @@ -144,7 +149,7 @@ public void testDetectSingleContainerReplica() throws TimeoutException, InterruptedException { String singleNodeContainer = "SingleNodeContainer"; String threeNodeContainer = "ThreeNodeContainer"; - InProgressPool ppool = replicationManager.getInProcessPoolList().get(0); + InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0); // Only single datanode reporting that "SingleNodeContainer" exists. List clist = datanodeStateManager.getContainerReport(singleNodeContainer, @@ -180,7 +185,7 @@ public void testDetectOverReplica() throws TimeoutException, String normalContainer = "NormalContainer"; String overReplicated = "OverReplicatedContainer"; String wayOverReplicated = "WayOverReplicated"; - InProgressPool ppool = replicationManager.getInProcessPoolList().get(0); + InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0); List clist = datanodeStateManager.getContainerReport(normalContainer, @@ -221,7 +226,7 @@ public void testDetectOverReplica() throws TimeoutException, public void testAllPoolsAreProcessed() throws TimeoutException, InterruptedException { // Verify that we saw all three pools being picked up for processing. - GenericTestUtils.waitFor(() -> replicationManager.getPoolProcessCount() + GenericTestUtils.waitFor(() -> containerSupervisor.getPoolProcessCount() >= 3, 200, 15 * 1000); Assert.assertTrue(logCapturer.getOutput().contains("Pool1") && logCapturer.getOutput().contains("Pool2") && @@ -253,7 +258,7 @@ public void testAddingNewPoolWorks() List clist = datanodeStateManager.getContainerReport("NewContainer1", "PoolNew", 1); - replicationManager.handleContainerReport(clist.get(0)); + containerSupervisor.handleContainerReport(clist.get(0)); GenericTestUtils.waitFor(() -> inProgressLog.getOutput().contains("NewContainer1") && inProgressLog .getOutput().contains(id.getDatanodeUuid()), 200, 10 * 1000); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java index 270929eabd5..13141265b37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java @@ -35,6 +35,8 @@ import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.ozone.scm.node.NodeManager; +import org.apache.hadoop.ozone.scm.node.NodePoolManager; +import org.mockito.Mockito; import java.io.IOException; import java.util.HashMap; @@ -269,6 +271,11 @@ public SCMNodeMetric getNodeStat(DatanodeID datanodeID) { return new SCMNodeMetric(nodeMetricMap.get(datanodeID.toString())); } + @Override + public NodePoolManager getNodePoolManager() { + return Mockito.mock(NodePoolManager.class); + } + /** * Used for testing. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java index ac8dee944e9..56085e7eb63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java @@ -203,8 +203,7 @@ public void testContainerCreationLeaseTimeout() throws IOException, } @Test - public void testFullContainerReport() throws IOException, - InterruptedException { + public void testFullContainerReport() throws IOException { String containerName = UUID.randomUUID().toString(); ContainerInfo info = createContainer(containerName); DatanodeID datanodeID = SCMTestUtils.getDatanodeID(); @@ -227,7 +226,13 @@ public void testFullContainerReport() throws IOException, .setContainerID(info.getContainerID()); reports.add(ciBuilder.build()); - mapping.processContainerReports(datanodeID, reportType, reports); + + ContainerReportsRequestProto.Builder crBuilder = + ContainerReportsRequestProto.newBuilder(); + crBuilder.setDatanodeID(datanodeID.getProtoBufMessage()) + .setType(reportType).addAllReports(reports); + + mapping.processContainerReports(crBuilder.build()); ContainerInfo updatedContainer = mapping.getContainer(containerName); Assert.assertEquals(100000000L, updatedContainer.getNumberOfKeys()); @@ -260,7 +265,12 @@ public void testContainerCloseWithContainerReport() throws IOException { reports.add(ciBuilder.build()); - mapping.processContainerReports(datanodeID, reportType, reports); + ContainerReportsRequestProto.Builder crBuilder = + ContainerReportsRequestProto.newBuilder(); + crBuilder.setDatanodeID(datanodeID.getProtoBufMessage()) + .setType(reportType).addAllReports(reports); + + mapping.processContainerReports(crBuilder.build()); ContainerInfo updatedContainer = mapping.getContainer(containerName); Assert.assertEquals(500000000L, updatedContainer.getNumberOfKeys());