HDFS-13070. Ozone: SCM: Support for container replica reconciliation - 1. Contributed by Nanda kumar.

This commit is contained in:
Nanda kumar 2018-02-21 00:40:00 +05:30 committed by Owen O'Malley
parent 4c10a849e8
commit b4a3cf1476
12 changed files with 203 additions and 118 deletions

View File

@ -223,6 +223,12 @@ public final class ScmConfigKeys {
public static final String
OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT = "60s";
/**
* This determines the total number of pools to be processed in parallel.
*/
public static final String OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS =
"ozone.scm.max.nodepool.processing.threads";
public static final int OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT = 1;
/**
* These 2 settings control the number of threads in executor pool and time
* outs for thw container reports from all nodes.

View File

@ -984,9 +984,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
updateContainerReportMetrics(reports);
// should we process container reports async?
scmContainerManager.processContainerReports(
DatanodeID.getFromProtoBuf(reports.getDatanodeID()),
reports.getType(), reports.getReportsList());
scmContainerManager.processContainerReports(reports);
return ContainerReportsResponseProto.newBuilder().build();
}

View File

@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.lease.Lease;
import org.apache.hadoop.ozone.lease.LeaseException;
@ -29,7 +28,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.scm.container.replication.ContainerSupervisor;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
@ -74,6 +75,7 @@ public class ContainerMapping implements Mapping {
private final PipelineSelector pipelineSelector;
private final ContainerStateManager containerStateManager;
private final LeaseManager<ContainerInfo> containerLeaseManager;
private final ContainerSupervisor containerSupervisor;
private final float containerCloseThreshold;
/**
@ -113,6 +115,9 @@ public class ContainerMapping implements Mapping {
this.pipelineSelector = new PipelineSelector(nodeManager, conf);
this.containerStateManager =
new ContainerStateManager(conf, this);
this.containerSupervisor =
new ContainerSupervisor(conf, nodeManager,
nodeManager.getNodePoolManager());
this.containerCloseThreshold = conf.getFloat(
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD,
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
@ -347,16 +352,14 @@ public class ContainerMapping implements Mapping {
/**
* Process container report from Datanode.
*
* @param datanodeID Datanode ID
* @param reportType Type of report
* @param containerInfos container details
* @param reports Container report
*/
@Override
public void processContainerReports(
DatanodeID datanodeID,
ContainerReportsRequestProto.reportType reportType,
List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
containerInfos) throws IOException {
public void processContainerReports(ContainerReportsRequestProto reports)
throws IOException {
List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
containerInfos = reports.getReportsList();
containerSupervisor.handleContainerReport(reports);
for (StorageContainerDatanodeProtocolProtos.ContainerInfo containerInfo :
containerInfos) {
byte[] dbKey = containerInfo.getContainerNameBytes().toByteArray();
@ -395,7 +398,7 @@ public class ContainerMapping implements Mapping {
// TODO: Handling of containers which are already in close queue.
if (containerUsedPercentage >= containerCloseThreshold) {
// TODO: The container has to be moved to close container queue.
// For now, we are just updating the container state to CLOSED.
// For now, we are just updating the container state to CLOSING.
// Close container implementation can decide on how to maintain
// list of containers to be closed, this is the place where we
// have to add the containers to that list.
@ -412,7 +415,7 @@ public class ContainerMapping implements Mapping {
// Container not found in our container db.
LOG.error("Error while processing container report from datanode :" +
" {}, for container: {}, reason: container doesn't exist in" +
"container database.", datanodeID,
"container database.", reports.getDatanodeID(),
containerInfo.getContainerName());
}
} finally {

View File

@ -17,10 +17,7 @@
package org.apache.hadoop.ozone.scm.container;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
@ -102,14 +99,9 @@ public interface Mapping extends Closeable {
/**
* Process container report from Datanode.
*
* @param datanodeID Datanode ID
* @param reportType Type of report
* @param containerInfos container details
* @param reports Container report
*/
void processContainerReports(
DatanodeID datanodeID,
ContainerReportsRequestProto.reportType reportType,
List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
containerInfos) throws IOException;
void processContainerReports(ContainerReportsRequestProto reports)
throws IOException;
}

View File

@ -19,12 +19,11 @@ package org.apache.hadoop.ozone.scm.container.replication;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.CommandQueue;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.node.NodePoolManager;
import org.apache.hadoop.util.Time;
@ -43,6 +42,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static com.google.common.util.concurrent.Uninterruptibles
.sleepUninterruptibly;
@ -58,17 +59,20 @@ import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_MAX_CONTAINER_REPORT_THREADS;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT;
/**
* This class takes a set of container reports that belong to a pool and then
* computes the replication levels for each container.
*/
public class ContainerReplicationManager implements Closeable {
public class ContainerSupervisor implements Closeable {
public static final Logger LOG =
LoggerFactory.getLogger(ContainerReplicationManager.class);
LoggerFactory.getLogger(ContainerSupervisor.class);
private final NodePoolManager poolManager;
private final CommandQueue commandQueue;
private final HashSet<String> poolNames;
private final PriorityQueue<PeriodicPool> poolQueue;
private final NodeManager nodeManager;
@ -79,6 +83,9 @@ public class ContainerReplicationManager implements Closeable {
private long poolProcessCount;
private final List<InProgressPool> inProgressPoolList;
private final AtomicInteger threadFaultCount;
private final int inProgressPoolMaxCount;
private final ReadWriteLock inProgressPoolListLock;
/**
* Returns the number of times we have processed pools.
@ -95,13 +102,10 @@ public class ContainerReplicationManager implements Closeable {
* @param conf - OzoneConfiguration
* @param nodeManager - Node Manager
* @param poolManager - Pool Manager
* @param commandQueue - Datanodes Command Queue.
*/
public ContainerReplicationManager(OzoneConfiguration conf,
NodeManager nodeManager, NodePoolManager poolManager,
CommandQueue commandQueue) {
public ContainerSupervisor(Configuration conf, NodeManager nodeManager,
NodePoolManager poolManager) {
Preconditions.checkNotNull(poolManager);
Preconditions.checkNotNull(commandQueue);
Preconditions.checkNotNull(nodeManager);
this.containerProcessingLag =
conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL,
@ -116,18 +120,21 @@ public class ContainerReplicationManager implements Closeable {
conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT,
OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
this.inProgressPoolMaxCount = conf.getInt(
OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS,
OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT);
this.poolManager = poolManager;
this.commandQueue = commandQueue;
this.nodeManager = nodeManager;
this.poolNames = new HashSet<>();
this.poolQueue = new PriorityQueue<>();
runnable = new AtomicBoolean(true);
this.runnable = new AtomicBoolean(true);
this.threadFaultCount = new AtomicInteger(0);
executorService = HadoopExecutors.newCachedThreadPool(
this.executorService = HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Container Reports Processing Thread - %d")
.build(), maxContainerReportThreads);
inProgressPoolList = new LinkedList<>();
this.inProgressPoolList = new LinkedList<>();
this.inProgressPoolListLock = new ReentrantReadWriteLock();
initPoolProcessThread();
}
@ -211,31 +218,49 @@ public class ContainerReplicationManager implements Closeable {
while (runnable.get()) {
// Make sure that we don't have any new pools.
refreshPools();
PeriodicPool pool = poolQueue.poll();
if (pool != null) {
if (pool.getLastProcessedTime() + this.containerProcessingLag <
Time.monotonicNow()) {
LOG.debug("Adding pool {} to container processing queue", pool
.getPoolName());
InProgressPool inProgressPool = new InProgressPool(maxPoolWait,
pool, this.nodeManager, this.poolManager, this.commandQueue,
this.executorService);
inProgressPool.startReconciliation();
inProgressPoolList.add(inProgressPool);
poolProcessCount++;
} else {
LOG.debug("Not within the time window for processing: {}",
while (inProgressPoolList.size() < inProgressPoolMaxCount) {
PeriodicPool pool = poolQueue.poll();
if (pool != null) {
if (pool.getLastProcessedTime() + this.containerProcessingLag >
Time.monotonicNow()) {
LOG.debug("Not within the time window for processing: {}",
pool.getPoolName());
// we might over sleep here, not a big deal.
sleepUninterruptibly(this.containerProcessingLag,
TimeUnit.MILLISECONDS);
}
LOG.debug("Adding pool {} to container processing queue",
pool.getPoolName());
// Put back this pool since we are not planning to process it.
poolQueue.add(pool);
// we might over sleep here, not a big deal.
sleepUninterruptibly(this.containerProcessingLag,
TimeUnit.MILLISECONDS);
InProgressPool inProgressPool = new InProgressPool(maxPoolWait,
pool, this.nodeManager, this.poolManager, this.executorService);
inProgressPool.startReconciliation();
inProgressPoolListLock.writeLock().lock();
try {
inProgressPoolList.add(inProgressPool);
} finally {
inProgressPoolListLock.writeLock().unlock();
}
poolProcessCount++;
} else {
break;
}
}
sleepUninterruptibly(this.maxPoolWait, TimeUnit.MILLISECONDS);
inProgressPoolListLock.readLock().lock();
try {
for (InProgressPool inProgressPool : inProgressPoolList) {
inProgressPool.finalizeReconciliation();
poolQueue.add(inProgressPool.getPool());
}
} finally {
inProgressPoolListLock.readLock().unlock();
}
inProgressPoolListLock.writeLock().lock();
try {
inProgressPoolList.clear();
} finally {
inProgressPoolListLock.writeLock().unlock();
}
}
};
@ -263,28 +288,28 @@ public class ContainerReplicationManager implements Closeable {
*/
public void handleContainerReport(
ContainerReportsRequestProto containerReport) {
String poolName = null;
DatanodeID datanodeID = DatanodeID
.getFromProtoBuf(containerReport.getDatanodeID());
DatanodeID datanodeID = DatanodeID.getFromProtoBuf(
containerReport.getDatanodeID());
inProgressPoolListLock.readLock().lock();
try {
poolName = poolManager.getNodePool(datanodeID);
String poolName = poolManager.getNodePool(datanodeID);
for (InProgressPool ppool : inProgressPoolList) {
if (ppool.getPoolName().equalsIgnoreCase(poolName)) {
ppool.handleContainerReport(containerReport);
return;
}
}
// TODO: Decide if we can do anything else with this report.
LOG.debug("Discarding the container report for pool {}. " +
"That pool is not currently in the pool reconciliation process." +
" Container Name: {}", poolName, containerReport.getDatanodeID());
} catch (SCMException e) {
LOG.warn("Skipping processing container report from datanode {}, "
+ "cause: failed to get the corresponding node pool",
datanodeID.toString(), e);
return;
} finally {
inProgressPoolListLock.readLock().unlock();
}
for(InProgressPool ppool : inProgressPoolList) {
if(ppool.getPoolName().equalsIgnoreCase(poolName)) {
ppool.handleContainerReport(containerReport);
return;
}
}
// TODO: Decide if we can do anything else with this report.
LOG.debug("Discarding the container report for pool {}. That pool is not " +
"currently in the pool reconciliation process. Container Name: {}",
poolName, containerReport.getDatanodeID());
}
/**

View File

@ -21,9 +21,10 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.scm.node.CommandQueue;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.node.NodePoolManager;
import org.apache.hadoop.util.Time;
@ -39,10 +40,14 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.STALE;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.UNKNOWN;
import static com.google.common.util.concurrent.Uninterruptibles
.sleepUninterruptibly;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
.NodeState.HEALTHY;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
.NodeState.STALE;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
.NodeState.UNKNOWN;
/**
* These are pools that are actively checking for replication status of the
@ -51,8 +56,8 @@ import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.UNKNO
public final class InProgressPool {
public static final Logger LOG =
LoggerFactory.getLogger(InProgressPool.class);
private final PeriodicPool pool;
private final CommandQueue commandQueue;
private final NodeManager nodeManager;
private final NodePoolManager poolManager;
private final ExecutorService executorService;
@ -70,22 +75,19 @@ public final class InProgressPool {
* @param pool - Pool that we are working against
* @param nodeManager - Nodemanager
* @param poolManager - pool manager
* @param commandQueue - Command queue
* @param executorService - Shared Executor service.
*/
InProgressPool(long maxWaitTime, PeriodicPool pool,
NodeManager nodeManager, NodePoolManager poolManager,
CommandQueue commandQueue, ExecutorService executorService) {
ExecutorService executorService) {
Preconditions.checkNotNull(pool);
Preconditions.checkNotNull(nodeManager);
Preconditions.checkNotNull(poolManager);
Preconditions.checkNotNull(commandQueue);
Preconditions.checkNotNull(executorService);
Preconditions.checkArgument(maxWaitTime > 0);
this.pool = pool;
this.nodeManager = nodeManager;
this.poolManager = poolManager;
this.commandQueue = commandQueue;
this.executorService = executorService;
this.containerCountMap = new ConcurrentHashMap<>();
this.processedNodeSet = new ConcurrentHashMap<>();
@ -186,7 +188,7 @@ public final class InProgressPool {
// Queue commands to all datanodes in this pool to send us container
// report. Since we ignore dead nodes, it is possible that we would have
// over replicated the container if the node comes back.
commandQueue.addCommand(id, cmd);
nodeManager.addDatanodeCommand(id, cmd);
}
}
this.status = ProgressStatus.InProgress;
@ -235,7 +237,12 @@ public final class InProgressPool {
*/
public void handleContainerReport(
ContainerReportsRequestProto containerReport) {
executorService.submit(processContainerReport(containerReport));
if (status == ProgressStatus.InProgress) {
executorService.submit(processContainerReport(containerReport));
} else {
LOG.debug("Cannot handle container report when the pool is in {} status.",
status);
}
}
private Runnable processContainerReport(
@ -292,6 +299,11 @@ public final class InProgressPool {
return pool.getPoolName();
}
public void finalizeReconciliation() {
status = ProgressStatus.Done;
//TODO: Add finalizing logic. This is where actual reconciliation happens.
}
/**
* Current status of the computing replication status.
*/

View File

@ -122,6 +122,12 @@ public interface NodeManager extends StorageContainerNodeProtocol,
*/
SCMNodeMetric getNodeStat(DatanodeID datanodeID);
/**
* Returns the NodePoolManager associated with the NodeManager.
* @return NodePoolManager
*/
NodePoolManager getNodePoolManager();
/**
* Wait for the heartbeat is processed by NodeManager.
* @return true if heartbeat has been processed.

View File

@ -857,6 +857,11 @@ public class SCMNodeManager
return new SCMNodeMetric(nodeStats.get(datanodeID.getDatanodeUuid()));
}
@Override
public NodePoolManager getNodePoolManager() {
return nodePoolManager;
}
@Override
public Map<String, Integer> getNodeCount() {
Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();

View File

@ -29,26 +29,32 @@ import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.ozone.scm.node.CommandQueue;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
import org.apache.hadoop.ozone.scm.node.NodePoolManager;
import org.mockito.Mockito;
/**
* A Node Manager to test replication.
*/
public class ReplicationNodeManagerMock implements NodeManager {
private final Map<DatanodeID, NodeState> nodeStateMap;
private final CommandQueue commandQueue;
/**
* A list of Datanodes and current states.
* @param nodeState A node state map.
*/
public ReplicationNodeManagerMock(Map<DatanodeID, NodeState> nodeState) {
public ReplicationNodeManagerMock(Map<DatanodeID, NodeState> nodeState,
CommandQueue commandQueue) {
Preconditions.checkNotNull(nodeState);
nodeStateMap = nodeState;
this.nodeStateMap = nodeState;
this.commandQueue = commandQueue;
}
/**
@ -194,6 +200,11 @@ public class ReplicationNodeManagerMock implements NodeManager {
return null;
}
@Override
public NodePoolManager getNodePoolManager() {
return Mockito.mock(NodePoolManager.class);
}
/**
* Wait for the heartbeat is processed by NodeManager.
*
@ -304,4 +315,9 @@ public class ReplicationNodeManagerMock implements NodeManager {
nodeStateMap.put(id, state);
}
@Override
public void addDatanodeCommand(DatanodeID id, SCMCommand command) {
this.commandQueue.addCommand(id, command);
}
}

View File

@ -28,8 +28,7 @@ import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.scm.container.replication
.ContainerReplicationManager;
import org.apache.hadoop.ozone.scm.container.replication.ContainerSupervisor;
import org.apache.hadoop.ozone.scm.container.replication.InProgressPool;
import org.apache.hadoop.ozone.scm.node.CommandQueue;
import org.apache.hadoop.ozone.scm.node.NodeManager;
@ -53,35 +52,37 @@ import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL;
import static org.apache.ratis.shaded.com.google.common.util.concurrent
.Uninterruptibles.sleepUninterruptibly;
/**
* Tests for the container manager.
*/
public class TestContainerReplicationManager {
public class TestContainerSupervisor {
final static String POOL_NAME_TEMPLATE = "Pool%d";
static final int MAX_DATANODES = 72;
static final int POOL_SIZE = 24;
static final int POOL_COUNT = 3;
private LogCapturer logCapturer = LogCapturer.captureLogs(
LogFactory.getLog(ContainerReplicationManager.class));
LogFactory.getLog(ContainerSupervisor.class));
private List<DatanodeID> datanodes = new LinkedList<>();
private NodeManager nodeManager;
private NodePoolManager poolManager;
private CommandQueue commandQueue;
private ContainerReplicationManager replicationManager;
private ContainerSupervisor containerSupervisor;
private ReplicationDatanodeStateManager datanodeStateManager;
@After
public void tearDown() throws Exception {
logCapturer.stopCapturing();
GenericTestUtils.setLogLevel(ContainerReplicationManager.LOG, Level.INFO);
GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.INFO);
}
@Before
public void setUp() throws Exception {
GenericTestUtils.setLogLevel(ContainerReplicationManager.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.DEBUG);
Map<DatanodeID, NodeState> nodeStateMap = new HashMap<>();
// We are setting up 3 pools with 24 nodes each in this cluster.
// First we create 72 Datanodes.
@ -91,11 +92,13 @@ public class TestContainerReplicationManager {
nodeStateMap.put(datanode, HEALTHY);
}
// All nodes in this cluster are healthy for time being.
nodeManager = new ReplicationNodeManagerMock(nodeStateMap);
poolManager = new ReplicationNodePoolManagerMock();
commandQueue = new CommandQueue();
// All nodes in this cluster are healthy for time being.
nodeManager = new ReplicationNodeManagerMock(nodeStateMap, commandQueue);
poolManager = new ReplicationNodePoolManagerMock();
Assert.assertEquals("Max datanodes should be equal to POOL_SIZE * " +
"POOL_COUNT", POOL_COUNT * POOL_SIZE, MAX_DATANODES);
@ -108,10 +111,12 @@ public class TestContainerReplicationManager {
}
}
OzoneConfiguration config = SCMTestUtils.getOzoneConf();
config.setTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, 1,
config.setTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, 2,
TimeUnit.SECONDS);
replicationManager = new ContainerReplicationManager(config,
nodeManager, poolManager, commandQueue);
config.setTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL, 1,
TimeUnit.SECONDS);
containerSupervisor = new ContainerSupervisor(config,
nodeManager, poolManager);
datanodeStateManager = new ReplicationDatanodeStateManager(nodeManager,
poolManager);
// Sleep for one second to make sure all threads get time to run.
@ -125,13 +130,13 @@ public class TestContainerReplicationManager {
public void testAssertPoolsAreProcessed() {
// This asserts that replication manager has started processing at least
// one pool.
Assert.assertTrue(replicationManager.getInProgressPoolCount() > 0);
Assert.assertTrue(containerSupervisor.getInProgressPoolCount() > 0);
// Since all datanodes are flagged as healthy in this test, for each
// datanode we must have queued a command.
Assert.assertEquals("Commands are in queue :", commandQueue
.getCommandsInQueue(), POOL_SIZE * replicationManager
.getInProgressPoolCount());
Assert.assertEquals("Commands are in queue :",
POOL_SIZE * containerSupervisor.getInProgressPoolCount(),
commandQueue.getCommandsInQueue());
}
@Test
@ -144,7 +149,7 @@ public class TestContainerReplicationManager {
InterruptedException {
String singleNodeContainer = "SingleNodeContainer";
String threeNodeContainer = "ThreeNodeContainer";
InProgressPool ppool = replicationManager.getInProcessPoolList().get(0);
InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0);
// Only single datanode reporting that "SingleNodeContainer" exists.
List<ContainerReportsRequestProto> clist =
datanodeStateManager.getContainerReport(singleNodeContainer,
@ -180,7 +185,7 @@ public class TestContainerReplicationManager {
String normalContainer = "NormalContainer";
String overReplicated = "OverReplicatedContainer";
String wayOverReplicated = "WayOverReplicated";
InProgressPool ppool = replicationManager.getInProcessPoolList().get(0);
InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0);
List<ContainerReportsRequestProto> clist =
datanodeStateManager.getContainerReport(normalContainer,
@ -221,7 +226,7 @@ public class TestContainerReplicationManager {
public void testAllPoolsAreProcessed() throws TimeoutException,
InterruptedException {
// Verify that we saw all three pools being picked up for processing.
GenericTestUtils.waitFor(() -> replicationManager.getPoolProcessCount()
GenericTestUtils.waitFor(() -> containerSupervisor.getPoolProcessCount()
>= 3, 200, 15 * 1000);
Assert.assertTrue(logCapturer.getOutput().contains("Pool1") &&
logCapturer.getOutput().contains("Pool2") &&
@ -253,7 +258,7 @@ public class TestContainerReplicationManager {
List<ContainerReportsRequestProto> clist =
datanodeStateManager.getContainerReport("NewContainer1",
"PoolNew", 1);
replicationManager.handleContainerReport(clist.get(0));
containerSupervisor.handleContainerReport(clist.get(0));
GenericTestUtils.waitFor(() ->
inProgressLog.getOutput().contains("NewContainer1") && inProgressLog
.getOutput().contains(id.getDatanodeUuid()), 200, 10 * 1000);

View File

@ -35,6 +35,8 @@ import org.apache.hadoop.ozone.protocol.proto
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.node.NodePoolManager;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.HashMap;
@ -269,6 +271,11 @@ public class MockNodeManager implements NodeManager {
return new SCMNodeMetric(nodeMetricMap.get(datanodeID.toString()));
}
@Override
public NodePoolManager getNodePoolManager() {
return Mockito.mock(NodePoolManager.class);
}
/**
* Used for testing.
*

View File

@ -203,8 +203,7 @@ public class TestContainerMapping {
}
@Test
public void testFullContainerReport() throws IOException,
InterruptedException {
public void testFullContainerReport() throws IOException {
String containerName = UUID.randomUUID().toString();
ContainerInfo info = createContainer(containerName);
DatanodeID datanodeID = SCMTestUtils.getDatanodeID();
@ -227,7 +226,13 @@ public class TestContainerMapping {
.setContainerID(info.getContainerID());
reports.add(ciBuilder.build());
mapping.processContainerReports(datanodeID, reportType, reports);
ContainerReportsRequestProto.Builder crBuilder =
ContainerReportsRequestProto.newBuilder();
crBuilder.setDatanodeID(datanodeID.getProtoBufMessage())
.setType(reportType).addAllReports(reports);
mapping.processContainerReports(crBuilder.build());
ContainerInfo updatedContainer = mapping.getContainer(containerName);
Assert.assertEquals(100000000L, updatedContainer.getNumberOfKeys());
@ -260,7 +265,12 @@ public class TestContainerMapping {
reports.add(ciBuilder.build());
mapping.processContainerReports(datanodeID, reportType, reports);
ContainerReportsRequestProto.Builder crBuilder =
ContainerReportsRequestProto.newBuilder();
crBuilder.setDatanodeID(datanodeID.getProtoBufMessage())
.setType(reportType).addAllReports(reports);
mapping.processContainerReports(crBuilder.build());
ContainerInfo updatedContainer = mapping.getContainer(containerName);
Assert.assertEquals(500000000L, updatedContainer.getNumberOfKeys());