HDFS-11493. Ozone: SCM: Add the ability to handle container reports. Contributed by Anu Engineer.
This commit is contained in:
parent
5341fa13af
commit
8d37ef30e1
|
@ -20,6 +20,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.util.concurrent;
|
package org.apache.hadoop.util.concurrent;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
@ -41,6 +43,14 @@ public final class HadoopExecutors {
|
||||||
threadFactory);
|
threadFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static ExecutorService newCachedThreadPool(ThreadFactory
|
||||||
|
threadFactory, int maxThreads) {
|
||||||
|
return new HadoopThreadPoolExecutor(0, maxThreads,
|
||||||
|
60L, TimeUnit.SECONDS,
|
||||||
|
new LinkedBlockingQueue<>(),
|
||||||
|
threadFactory);
|
||||||
|
}
|
||||||
|
|
||||||
public static ExecutorService newFixedThreadPool(int nThreads,
|
public static ExecutorService newFixedThreadPool(int nThreads,
|
||||||
ThreadFactory threadFactory) {
|
ThreadFactory threadFactory) {
|
||||||
return new HadoopThreadPoolExecutor(nThreads, nThreads,
|
return new HadoopThreadPoolExecutor(nThreads, nThreads,
|
||||||
|
@ -91,6 +101,37 @@ public final class HadoopExecutors {
|
||||||
return Executors.newSingleThreadScheduledExecutor(threadFactory);
|
return Executors.newSingleThreadScheduledExecutor(threadFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper routine to shutdown a executorService.
|
||||||
|
* @param executorService - executorService
|
||||||
|
* @param logger - Logger
|
||||||
|
* @param timeout - Timeout
|
||||||
|
* @param unit - TimeUnits, generally seconds.
|
||||||
|
*/
|
||||||
|
public static void shutdown(ExecutorService executorService, Logger logger,
|
||||||
|
long timeout, TimeUnit unit) {
|
||||||
|
try {
|
||||||
|
if (executorService != null) {
|
||||||
|
executorService.shutdown();
|
||||||
|
try {
|
||||||
|
if (!executorService.awaitTermination(timeout, unit)) {
|
||||||
|
executorService.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!executorService.awaitTermination(timeout, unit)) {
|
||||||
|
logger.error("Unable to shutdown properly.");
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
logger.error("Error attempting to shutdown.", e);
|
||||||
|
executorService.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Error during shutdown: ", e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//disable instantiation
|
//disable instantiation
|
||||||
private HadoopExecutors() { }
|
private HadoopExecutors() { }
|
||||||
}
|
}
|
|
@ -190,6 +190,29 @@ public final class ScmConfigKeys {
|
||||||
public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 1;
|
public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 1;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Don't start processing a pool if we have not had a minimum number of
|
||||||
|
* seconds from the last processing.
|
||||||
|
*/
|
||||||
|
public static final String
|
||||||
|
OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_SECONDS =
|
||||||
|
"ozone.scm.container.report.processing.interval.seconds";
|
||||||
|
public static final int
|
||||||
|
OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT = 60;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* These 2 settings control the number of threads in executor pool and time
|
||||||
|
* outs for thw container reports from all nodes.
|
||||||
|
*/
|
||||||
|
public static final String OZONE_SCM_MAX_CONTAINER_REPORT_THREADS =
|
||||||
|
"ozone.scm.max.container.report.threads";
|
||||||
|
public static final int OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT = 100;
|
||||||
|
public static final String OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_SECONDS =
|
||||||
|
"ozone.scm.container.reports.wait.timeout.seconds";
|
||||||
|
public static final int OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT =
|
||||||
|
300; // Default 5 minute wait.
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Never constructed.
|
* Never constructed.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -0,0 +1,296 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.scm.container.replication;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||||
|
import org.apache.hadoop.ozone.scm.node.CommandQueue;
|
||||||
|
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
||||||
|
import org.apache.hadoop.ozone.scm.node.NodePoolManager;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.PriorityQueue;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import static com.google.common.util.concurrent.Uninterruptibles
|
||||||
|
.sleepUninterruptibly;
|
||||||
|
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||||
|
.OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT;
|
||||||
|
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||||
|
.OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_SECONDS;
|
||||||
|
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||||
|
.OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT;
|
||||||
|
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||||
|
.OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_SECONDS;
|
||||||
|
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||||
|
.OZONE_SCM_MAX_CONTAINER_REPORT_THREADS;
|
||||||
|
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||||
|
.OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class takes a set of container reports that belong to a pool and then
|
||||||
|
* computes the replication levels for each container.
|
||||||
|
*/
|
||||||
|
public class ContainerReplicationManager implements Closeable {
|
||||||
|
static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(ContainerReplicationManager.class);
|
||||||
|
|
||||||
|
private final NodePoolManager poolManager;
|
||||||
|
private final CommandQueue commandQueue;
|
||||||
|
private final HashSet<String> poolNames;
|
||||||
|
private final PriorityQueue<PeriodicPool> poolQueue;
|
||||||
|
private final NodeManager nodeManager;
|
||||||
|
private final int containerProcessingLag;
|
||||||
|
private final AtomicBoolean runnable;
|
||||||
|
private final ExecutorService executorService;
|
||||||
|
private final int maxPoolWait;
|
||||||
|
private long poolProcessCount;
|
||||||
|
private final List<InProgressPool> inProgressPoolList;
|
||||||
|
private final AtomicInteger threadFaultCount;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of times we have processed pools.
|
||||||
|
* @return long
|
||||||
|
*/
|
||||||
|
public long getPoolProcessCount() {
|
||||||
|
return poolProcessCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a class that computes Replication Levels.
|
||||||
|
*
|
||||||
|
* @param conf - OzoneConfiguration
|
||||||
|
* @param nodeManager - Node Manager
|
||||||
|
* @param poolManager - Pool Manager
|
||||||
|
* @param commandQueue - Datanodes Command Queue.
|
||||||
|
*/
|
||||||
|
public ContainerReplicationManager(OzoneConfiguration conf,
|
||||||
|
NodeManager nodeManager, NodePoolManager poolManager,
|
||||||
|
CommandQueue commandQueue) {
|
||||||
|
Preconditions.checkNotNull(poolManager);
|
||||||
|
Preconditions.checkNotNull(commandQueue);
|
||||||
|
Preconditions.checkNotNull(nodeManager);
|
||||||
|
this.containerProcessingLag =
|
||||||
|
conf.getInt(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_SECONDS,
|
||||||
|
OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT
|
||||||
|
|
||||||
|
) * 1000;
|
||||||
|
int maxContainerReportThreads =
|
||||||
|
conf.getInt(OZONE_SCM_MAX_CONTAINER_REPORT_THREADS,
|
||||||
|
OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT
|
||||||
|
);
|
||||||
|
this.maxPoolWait =
|
||||||
|
conf.getInt(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_SECONDS,
|
||||||
|
OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT) * 1000;
|
||||||
|
this.poolManager = poolManager;
|
||||||
|
this.commandQueue = commandQueue;
|
||||||
|
this.nodeManager = nodeManager;
|
||||||
|
this.poolNames = new HashSet<>();
|
||||||
|
this.poolQueue = new PriorityQueue<>();
|
||||||
|
runnable = new AtomicBoolean(true);
|
||||||
|
this.threadFaultCount = new AtomicInteger(0);
|
||||||
|
executorService = HadoopExecutors.newCachedThreadPool(
|
||||||
|
new ThreadFactoryBuilder().setDaemon(true)
|
||||||
|
.setNameFormat("Container Reports Processing Thread - %d")
|
||||||
|
.build(), maxContainerReportThreads);
|
||||||
|
inProgressPoolList = new LinkedList<>();
|
||||||
|
|
||||||
|
initPoolProcessThread();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of pools that are under process right now.
|
||||||
|
* @return int - Number of pools that are in process.
|
||||||
|
*/
|
||||||
|
public int getInProgressPoolCount() {
|
||||||
|
return inProgressPoolList.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exits the background thread.
|
||||||
|
*/
|
||||||
|
public void setExit() {
|
||||||
|
this.runnable.set(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds or removes pools from names that we need to process.
|
||||||
|
*
|
||||||
|
* There are two different cases that we need to process.
|
||||||
|
* The case where some pools are being added and some times we have to
|
||||||
|
* handle cases where pools are removed.
|
||||||
|
*/
|
||||||
|
private void refreshPools() {
|
||||||
|
List<String> pools = this.poolManager.getNodePools();
|
||||||
|
if (pools != null) {
|
||||||
|
|
||||||
|
HashSet<String> removedPools =
|
||||||
|
computePoolDifference(this.poolNames, new HashSet<>(pools));
|
||||||
|
|
||||||
|
HashSet<String> addedPools =
|
||||||
|
computePoolDifference(new HashSet<>(pools), this.poolNames);
|
||||||
|
// TODO: Support remove pool API in pool manager so that this code
|
||||||
|
// path can be tested. This never happens in the current code base.
|
||||||
|
for (String poolName : removedPools) {
|
||||||
|
for (PeriodicPool periodicPool : poolQueue) {
|
||||||
|
if (periodicPool.getPoolName().compareTo(poolName) == 0) {
|
||||||
|
poolQueue.remove(periodicPool);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Remove the pool names that we have in the list.
|
||||||
|
this.poolNames.removeAll(removedPools);
|
||||||
|
|
||||||
|
for (String poolName : addedPools) {
|
||||||
|
poolQueue.add(new PeriodicPool(poolName));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add to the pool names we are tracking.
|
||||||
|
poolNames.addAll(addedPools);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle the case where pools are added.
|
||||||
|
*
|
||||||
|
* @param newPools - New Pools list
|
||||||
|
* @param oldPool - oldPool List.
|
||||||
|
*/
|
||||||
|
private HashSet<String> computePoolDifference(HashSet<String> newPools,
|
||||||
|
Set<String> oldPool) {
|
||||||
|
Preconditions.checkNotNull(newPools);
|
||||||
|
Preconditions.checkNotNull(oldPool);
|
||||||
|
HashSet<String> newSet = new HashSet<>(newPools);
|
||||||
|
newSet.removeAll(oldPool);
|
||||||
|
return newSet;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initPoolProcessThread() {
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Task that runs to check if we need to start a pool processing job.
|
||||||
|
* if so we create a pool reconciliation job and find out of all the
|
||||||
|
* expected containers are on the nodes.
|
||||||
|
*/
|
||||||
|
Runnable processPools = () -> {
|
||||||
|
while (runnable.get()) {
|
||||||
|
// Make sure that we don't have any new pools.
|
||||||
|
refreshPools();
|
||||||
|
PeriodicPool pool = poolQueue.poll();
|
||||||
|
if (pool != null) {
|
||||||
|
if (pool.getLastProcessedTime() + this.containerProcessingLag <
|
||||||
|
Time.monotonicNow()) {
|
||||||
|
LOG.debug("Adding pool {} to container processing queue", pool
|
||||||
|
.getPoolName());
|
||||||
|
InProgressPool inProgressPool = new InProgressPool(maxPoolWait,
|
||||||
|
pool, this.nodeManager, this.poolManager, this.commandQueue,
|
||||||
|
this.executorService);
|
||||||
|
inProgressPool.startReconciliation();
|
||||||
|
inProgressPoolList.add(inProgressPool);
|
||||||
|
poolProcessCount++;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
|
||||||
|
LOG.debug("Not within the time window for processing: {}",
|
||||||
|
pool.getPoolName());
|
||||||
|
// Put back this pool since we are not planning to process it.
|
||||||
|
poolQueue.add(pool);
|
||||||
|
// we might over sleep here, not a big deal.
|
||||||
|
sleepUninterruptibly(this.containerProcessingLag,
|
||||||
|
TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sleepUninterruptibly(this.maxPoolWait, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// We will have only one thread for pool processing.
|
||||||
|
Thread poolProcessThread = new Thread(processPools);
|
||||||
|
poolProcessThread.setDaemon(true);
|
||||||
|
poolProcessThread.setName("Pool replica thread");
|
||||||
|
poolProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
|
||||||
|
// Let us just restart this thread after logging a critical error.
|
||||||
|
// if this thread is not running we cannot handle commands from SCM.
|
||||||
|
LOG.error("Critical Error : Pool replica thread encountered an " +
|
||||||
|
"error. Thread: {} Error Count : {}", t.toString(), e,
|
||||||
|
threadFaultCount.incrementAndGet());
|
||||||
|
poolProcessThread.start();
|
||||||
|
// TODO : Add a config to restrict how many times we will restart this
|
||||||
|
// thread in a single session.
|
||||||
|
});
|
||||||
|
poolProcessThread.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds a container report to appropriate inProgress Pool.
|
||||||
|
* @param containerReport -- Container report for a specific container from
|
||||||
|
* a datanode.
|
||||||
|
*/
|
||||||
|
public void handleContainerReport(ContainerReportsProto containerReport) {
|
||||||
|
String poolName = poolManager.getNodePool(
|
||||||
|
DatanodeID.getFromProtoBuf(containerReport.getDatanodeID()));
|
||||||
|
|
||||||
|
for(InProgressPool ppool : inProgressPoolList) {
|
||||||
|
if(ppool.getPoolName().equalsIgnoreCase(poolName)) {
|
||||||
|
ppool.handleContainerReport(containerReport);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// TODO: Decide if we can do anything else with this report.
|
||||||
|
LOG.debug("Discarding the container report for pool {}. That pool is not " +
|
||||||
|
"currently in the pool reconciliation process. Container Name: {}",
|
||||||
|
poolName, containerReport.getDatanodeID());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get in process pool list, used for testing.
|
||||||
|
* @return List of InProgressPool
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public List<InProgressPool> getInProcessPoolList() {
|
||||||
|
return inProgressPoolList;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown the Container Replication Manager.
|
||||||
|
* @throws IOException if an I/O error occurs
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
setExit();
|
||||||
|
HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,302 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.scm.container.replication;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||||
|
import org.apache.hadoop.ozone.scm.node.CommandQueue;
|
||||||
|
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
||||||
|
import org.apache.hadoop.ozone.scm.node.NodePoolManager;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static com.google.common.util.concurrent.Uninterruptibles
|
||||||
|
.sleepUninterruptibly;
|
||||||
|
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY;
|
||||||
|
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE;
|
||||||
|
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.UNKNOWN;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* These are pools that are actively checking for replication status of the
|
||||||
|
* containers.
|
||||||
|
*/
|
||||||
|
public final class InProgressPool {
|
||||||
|
public static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(InProgressPool.class);
|
||||||
|
private final PeriodicPool pool;
|
||||||
|
private final CommandQueue commandQueue;
|
||||||
|
private final NodeManager nodeManager;
|
||||||
|
private final NodePoolManager poolManager;
|
||||||
|
private final ExecutorService executorService;
|
||||||
|
private final Map<String, Integer> containerCountMap;
|
||||||
|
private final Map<String, Boolean> processedNodeSet;
|
||||||
|
private final long startTime;
|
||||||
|
private ProgressStatus status;
|
||||||
|
private AtomicInteger nodeCount;
|
||||||
|
private AtomicInteger nodeProcessed;
|
||||||
|
private AtomicInteger containerProcessedCount;
|
||||||
|
private int maxWaitTime;
|
||||||
|
/**
|
||||||
|
* Constructs an pool that is being processed.
|
||||||
|
*
|
||||||
|
* @param maxWaitTime - Maximum wait time in milliseconds.
|
||||||
|
* @param pool - Pool that we are working against
|
||||||
|
* @param nodeManager - Nodemanager
|
||||||
|
* @param poolManager - pool manager
|
||||||
|
* @param commandQueue - Command queue
|
||||||
|
* @param executorService - Shared Executor service.
|
||||||
|
*/
|
||||||
|
InProgressPool(int maxWaitTime, PeriodicPool pool,
|
||||||
|
NodeManager nodeManager, NodePoolManager poolManager,
|
||||||
|
CommandQueue commandQueue, ExecutorService executorService) {
|
||||||
|
Preconditions.checkNotNull(pool);
|
||||||
|
Preconditions.checkNotNull(nodeManager);
|
||||||
|
Preconditions.checkNotNull(poolManager);
|
||||||
|
Preconditions.checkNotNull(commandQueue);
|
||||||
|
Preconditions.checkNotNull(executorService);
|
||||||
|
Preconditions.checkArgument(maxWaitTime > 0);
|
||||||
|
this.pool = pool;
|
||||||
|
this.nodeManager = nodeManager;
|
||||||
|
this.poolManager = poolManager;
|
||||||
|
this.commandQueue = commandQueue;
|
||||||
|
this.executorService = executorService;
|
||||||
|
this.containerCountMap = new ConcurrentHashMap<>();
|
||||||
|
this.processedNodeSet = new ConcurrentHashMap<>();
|
||||||
|
this.maxWaitTime = maxWaitTime;
|
||||||
|
startTime = Time.monotonicNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns periodic pool.
|
||||||
|
*
|
||||||
|
* @return PeriodicPool
|
||||||
|
*/
|
||||||
|
public PeriodicPool getPool() {
|
||||||
|
return pool;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We are done if we have got reports from all nodes or we have
|
||||||
|
* done waiting for the specified time.
|
||||||
|
*
|
||||||
|
* @return true if we are done, false otherwise.
|
||||||
|
*/
|
||||||
|
public boolean isDone() {
|
||||||
|
return (nodeCount.get() == nodeProcessed.get()) ||
|
||||||
|
(this.startTime + this.maxWaitTime) > Time.monotonicNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the number of containers processed.
|
||||||
|
*
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
|
public int getContainerProcessedCount() {
|
||||||
|
return containerProcessedCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the start time in milliseconds.
|
||||||
|
*
|
||||||
|
* @return - Start Time.
|
||||||
|
*/
|
||||||
|
public long getStartTime() {
|
||||||
|
return startTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of nodes in this pool.
|
||||||
|
*
|
||||||
|
* @return - node count
|
||||||
|
*/
|
||||||
|
public int getNodeCount() {
|
||||||
|
return nodeCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of nodes that we have already processed container reports
|
||||||
|
* from.
|
||||||
|
*
|
||||||
|
* @return - Processed count.
|
||||||
|
*/
|
||||||
|
public int getNodeProcessed() {
|
||||||
|
return nodeProcessed.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current status.
|
||||||
|
*
|
||||||
|
* @return Status
|
||||||
|
*/
|
||||||
|
public ProgressStatus getStatus() {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts the reconciliation process for all the nodes in the pool.
|
||||||
|
*/
|
||||||
|
public void startReconciliation() {
|
||||||
|
List<DatanodeID> datanodeIDList =
|
||||||
|
this.poolManager.getNodes(pool.getPoolName());
|
||||||
|
if (datanodeIDList.size() == 0) {
|
||||||
|
LOG.error("Datanode list for {} is Empty. Pool with no nodes ? ",
|
||||||
|
pool.getPoolName());
|
||||||
|
this.status = ProgressStatus.Error;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeProcessed = new AtomicInteger(0);
|
||||||
|
containerProcessedCount = new AtomicInteger(0);
|
||||||
|
nodeCount = new AtomicInteger(0);
|
||||||
|
/*
|
||||||
|
Ask each datanode to send us commands.
|
||||||
|
*/
|
||||||
|
SendContainerCommand cmd = SendContainerCommand.newBuilder().build();
|
||||||
|
for (DatanodeID id : datanodeIDList) {
|
||||||
|
NodeManager.NODESTATE currentState = getNodestate(id);
|
||||||
|
if (currentState == HEALTHY || currentState == STALE) {
|
||||||
|
nodeCount.incrementAndGet();
|
||||||
|
// Queue commands to all datanodes in this pool to send us container
|
||||||
|
// report. Since we ignore dead nodes, it is possible that we would have
|
||||||
|
// over replicated the container if the node comes back.
|
||||||
|
commandQueue.addCommand(id, cmd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.status = ProgressStatus.InProgress;
|
||||||
|
this.getPool().setLastProcessedTime(Time.monotonicNow());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the node state.
|
||||||
|
*
|
||||||
|
* @param id - datanode ID.
|
||||||
|
* @return NodeState.
|
||||||
|
*/
|
||||||
|
private NodeManager.NODESTATE getNodestate(DatanodeID id) {
|
||||||
|
NodeManager.NODESTATE currentState = UNKNOWN;
|
||||||
|
int maxTry = 100;
|
||||||
|
// We need to loop to make sure that we will retry if we get
|
||||||
|
// node state unknown. This can lead to infinite loop if we send
|
||||||
|
// in unknown node ID. So max try count is used to prevent it.
|
||||||
|
|
||||||
|
int currentTry = 0;
|
||||||
|
while (currentState == UNKNOWN && currentTry < maxTry) {
|
||||||
|
// Retry to make sure that we deal with the case of node state not
|
||||||
|
// known.
|
||||||
|
currentState = nodeManager.getNodeState(id);
|
||||||
|
currentTry++;
|
||||||
|
if (currentState == UNKNOWN) {
|
||||||
|
// Sleep to make sure that this is not a tight loop.
|
||||||
|
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (currentState == UNKNOWN) {
|
||||||
|
LOG.error("Not able to determine the state of Node: {}, Exceeded max " +
|
||||||
|
"try and node manager returns UNKNOWN state. This indicates we " +
|
||||||
|
"are dealing with a node that we don't know about.", id);
|
||||||
|
}
|
||||||
|
return currentState;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Queues a container Report for handling. This is done in a worker thread
|
||||||
|
* since decoding a container report might be compute intensive . We don't
|
||||||
|
* want to block since we have asked for bunch of container reports
|
||||||
|
* from a set of datanodes.
|
||||||
|
*
|
||||||
|
* @param containerReport - ContainerReport
|
||||||
|
*/
|
||||||
|
public void handleContainerReport(ContainerReportsProto containerReport) {
|
||||||
|
executorService.submit(processContainerReport(containerReport));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Runnable processContainerReport(ContainerReportsProto reports) {
|
||||||
|
return () -> {
|
||||||
|
DatanodeID datanodeID =
|
||||||
|
DatanodeID.getFromProtoBuf(reports.getDatanodeID());
|
||||||
|
if (processedNodeSet.computeIfAbsent(datanodeID.getDatanodeUuid(),
|
||||||
|
(k) -> true)) {
|
||||||
|
nodeProcessed.incrementAndGet();
|
||||||
|
LOG.debug("Total Nodes processed : {} Node Name: {} ", nodeProcessed,
|
||||||
|
datanodeID.getDatanodeUuid());
|
||||||
|
for (ContainerInfo info : reports.getReportsList()) {
|
||||||
|
containerProcessedCount.incrementAndGet();
|
||||||
|
LOG.debug("Total Containers processed: {} Container Name: {}",
|
||||||
|
containerProcessedCount.get(), info.getContainerName());
|
||||||
|
|
||||||
|
// Update the container map with count + 1 if the key exists or
|
||||||
|
// update the map with 1. Since this is a concurrentMap the
|
||||||
|
// computation and update is atomic.
|
||||||
|
containerCountMap.merge(info.getContainerName(), 1, Integer::sum);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Filter the containers based on specific rules.
|
||||||
|
*
|
||||||
|
* @param predicate -- Predicate to filter by
|
||||||
|
* @return A list of map entries.
|
||||||
|
*/
|
||||||
|
public List<Map.Entry<String, Integer>> filterContainer(
|
||||||
|
Predicate<Map.Entry<String, Integer>> predicate) {
|
||||||
|
return containerCountMap.entrySet().stream()
|
||||||
|
.filter(predicate).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used only for testing, calling this will abort container report
|
||||||
|
* processing. This is very dangerous call and should not be made by any users
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setDoneProcessing() {
|
||||||
|
nodeProcessed.set(nodeCount.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the pool name.
|
||||||
|
*
|
||||||
|
* @return Name of the pool.
|
||||||
|
*/
|
||||||
|
String getPoolName() {
|
||||||
|
return pool.getPoolName();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Current status of the computing replication status.
|
||||||
|
*/
|
||||||
|
public enum ProgressStatus {
|
||||||
|
InProgress, Done, Error
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,119 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.scm.container.replication;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Periodic pool is a pool with a time stamp, this allows us to process pools
|
||||||
|
* based on a cyclic clock.
|
||||||
|
*/
|
||||||
|
public class PeriodicPool implements Comparable<PeriodicPool> {
|
||||||
|
private final String poolName;
|
||||||
|
private long lastProcessedTime;
|
||||||
|
private AtomicLong totalProcessedCount;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a periodic pool.
|
||||||
|
*
|
||||||
|
* @param poolName - Name of the pool
|
||||||
|
*/
|
||||||
|
public PeriodicPool(String poolName) {
|
||||||
|
this.poolName = poolName;
|
||||||
|
lastProcessedTime = 0;
|
||||||
|
totalProcessedCount = new AtomicLong(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get pool Name.
|
||||||
|
* @return PoolName
|
||||||
|
*/
|
||||||
|
public String getPoolName() {
|
||||||
|
return poolName;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compares this object with the specified object for order. Returns a
|
||||||
|
* negative integer, zero, or a positive integer as this object is less
|
||||||
|
* than, equal to, or greater than the specified object.
|
||||||
|
*
|
||||||
|
* @param o the object to be compared.
|
||||||
|
* @return a negative integer, zero, or a positive integer as this object is
|
||||||
|
* less than, equal to, or greater than the specified object.
|
||||||
|
* @throws NullPointerException if the specified object is null
|
||||||
|
* @throws ClassCastException if the specified object's type prevents it
|
||||||
|
* from being compared to this object.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int compareTo(PeriodicPool o) {
|
||||||
|
return Long.compare(this.lastProcessedTime, o.lastProcessedTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
PeriodicPool that = (PeriodicPool) o;
|
||||||
|
|
||||||
|
return poolName.equals(that.poolName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return poolName.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the Total Times we have processed this pool.
|
||||||
|
*
|
||||||
|
* @return processed count.
|
||||||
|
*/
|
||||||
|
public long getTotalProcessedCount() {
|
||||||
|
return totalProcessedCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the last time we processed this pool.
|
||||||
|
* @return time in milliseconds
|
||||||
|
*/
|
||||||
|
public long getLastProcessedTime() {
|
||||||
|
return this.lastProcessedTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the last processed time.
|
||||||
|
*
|
||||||
|
* @param lastProcessedTime - Long in milliseconds.
|
||||||
|
*/
|
||||||
|
|
||||||
|
public void setLastProcessedTime(long lastProcessedTime) {
|
||||||
|
this.lastProcessedTime = lastProcessedTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Increments the total processed count.
|
||||||
|
*/
|
||||||
|
public void incTotalProcessedCount() {
|
||||||
|
this.totalProcessedCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,23 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.scm.container.replication;
|
||||||
|
/*
|
||||||
|
This package contains routines that manage replication of a container. This
|
||||||
|
relies on container reports to understand the replication level of a
|
||||||
|
container - UnderReplicated, Replicated, OverReplicated -- and manages the
|
||||||
|
replication level based on that.
|
||||||
|
*/
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -6,9 +6,9 @@
|
||||||
* to you under the Apache License, Version 2.0 (the
|
* to you under the Apache License, Version 2.0 (the
|
||||||
* "License"); you may not use this file except in compliance
|
* "License"); you may not use this file except in compliance
|
||||||
* with the License. You may obtain a copy of the License at
|
* with the License. You may obtain a copy of the License at
|
||||||
*
|
* <p>
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
* <p>
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
@ -17,8 +17,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.scm.node;
|
package org.apache.hadoop.ozone.scm.node;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
@ -35,18 +38,43 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||||
* there where queued.
|
* there where queued.
|
||||||
*/
|
*/
|
||||||
public class CommandQueue {
|
public class CommandQueue {
|
||||||
|
// This list is used as default return value.
|
||||||
private final Map<DatanodeID, List<SCMCommand>> commandMap;
|
|
||||||
private final Lock lock;
|
|
||||||
// This map is used as default return value.
|
|
||||||
private static final List<SCMCommand> DEFAULT_LIST = new LinkedList<>();
|
private static final List<SCMCommand> DEFAULT_LIST = new LinkedList<>();
|
||||||
|
private final Map<DatanodeID, Commands> commandMap;
|
||||||
|
private final Lock lock;
|
||||||
|
private long commandsInQueue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns number of commands in queue.
|
||||||
|
* @return Command Count.
|
||||||
|
*/
|
||||||
|
public long getCommandsInQueue() {
|
||||||
|
return commandsInQueue;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a Command Queue.
|
* Constructs a Command Queue.
|
||||||
|
* TODO : Add a flusher thread that throws away commands older than a certain
|
||||||
|
* time period.
|
||||||
*/
|
*/
|
||||||
public CommandQueue() {
|
public CommandQueue() {
|
||||||
commandMap = new HashMap<>();
|
commandMap = new HashMap<>();
|
||||||
lock = new ReentrantLock();
|
lock = new ReentrantLock();
|
||||||
|
commandsInQueue = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This function is used only for test purposes.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public void clear() {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
commandMap.clear();
|
||||||
|
commandsInQueue = 0;
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -61,8 +89,15 @@ public class CommandQueue {
|
||||||
List<SCMCommand> getCommand(final DatanodeID datanodeID) {
|
List<SCMCommand> getCommand(final DatanodeID datanodeID) {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
List<SCMCommand> cmds = commandMap.remove(datanodeID);
|
Commands cmds = commandMap.remove(datanodeID);
|
||||||
return cmds == null ? DEFAULT_LIST : cmds;
|
List<SCMCommand> cmdList = null;
|
||||||
|
if(cmds != null) {
|
||||||
|
cmdList = cmds.getCommands();
|
||||||
|
commandsInQueue -= cmdList.size() > 0 ? cmdList.size() : 0;
|
||||||
|
// A post condition really.
|
||||||
|
Preconditions.checkState(commandsInQueue >= 0);
|
||||||
|
}
|
||||||
|
return cmds == null ? DEFAULT_LIST : cmdList;
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -74,19 +109,82 @@ public class CommandQueue {
|
||||||
* @param datanodeID DatanodeID
|
* @param datanodeID DatanodeID
|
||||||
* @param command - Command
|
* @param command - Command
|
||||||
*/
|
*/
|
||||||
void addCommand(final DatanodeID datanodeID, final SCMCommand command) {
|
public void addCommand(final DatanodeID datanodeID, final SCMCommand
|
||||||
|
command) {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
if (commandMap.containsKey(datanodeID)) {
|
if (commandMap.containsKey(datanodeID)) {
|
||||||
commandMap.get(datanodeID).add(command);
|
commandMap.get(datanodeID).add(command);
|
||||||
} else {
|
} else {
|
||||||
LinkedList<SCMCommand> newList = new LinkedList<>();
|
commandMap.put(datanodeID, new Commands(command));
|
||||||
newList.add(command);
|
|
||||||
commandMap.put(datanodeID, newList);
|
|
||||||
}
|
}
|
||||||
|
commandsInQueue++;
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class that stores commands for a datanode.
|
||||||
|
*/
|
||||||
|
private static class Commands {
|
||||||
|
private long updateTime;
|
||||||
|
private long readTime;
|
||||||
|
private List<SCMCommand> commands;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a Commands class.
|
||||||
|
*/
|
||||||
|
Commands() {
|
||||||
|
commands = new LinkedList<>();
|
||||||
|
updateTime = 0;
|
||||||
|
readTime = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the object and populates with the command.
|
||||||
|
* @param command command to add to queue.
|
||||||
|
*/
|
||||||
|
Commands(SCMCommand command) {
|
||||||
|
this();
|
||||||
|
this.add(command);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the last time the commands for this node was updated.
|
||||||
|
* @return Time stamp
|
||||||
|
*/
|
||||||
|
public long getUpdateTime() {
|
||||||
|
return updateTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the last read time.
|
||||||
|
* @return last time when these commands were read from this queue.
|
||||||
|
*/
|
||||||
|
public long getReadTime() {
|
||||||
|
return readTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds a command to the list.
|
||||||
|
*
|
||||||
|
* @param command SCMCommand
|
||||||
|
*/
|
||||||
|
public void add(SCMCommand command) {
|
||||||
|
this.commands.add(command);
|
||||||
|
updateTime = Time.monotonicNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the commands for this datanode.
|
||||||
|
* @return command list.
|
||||||
|
*/
|
||||||
|
public List<SCMCommand> getCommands() {
|
||||||
|
List<SCMCommand> temp = this.commands;
|
||||||
|
this.commands = new LinkedList<>();
|
||||||
|
readTime = Time.monotonicNow();
|
||||||
|
return temp;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -109,7 +109,8 @@ public interface NodeManager extends StorageContainerNodeProtocol,
|
||||||
enum NODESTATE {
|
enum NODESTATE {
|
||||||
HEALTHY,
|
HEALTHY,
|
||||||
STALE,
|
STALE,
|
||||||
DEAD
|
DEAD,
|
||||||
|
UNKNOWN
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -137,4 +138,11 @@ public interface NodeManager extends StorageContainerNodeProtocol,
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
boolean waitForHeartbeatProcessed();
|
boolean waitForHeartbeatProcessed();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the node state of a specific node.
|
||||||
|
* @param id - DatanodeID
|
||||||
|
* @return Healthy/Stale/Dead.
|
||||||
|
*/
|
||||||
|
NODESTATE getNodeState(DatanodeID id);
|
||||||
}
|
}
|
||||||
|
|
|
@ -376,6 +376,12 @@ public class SCMNodeManager
|
||||||
return staleNodeCount.get();
|
return staleNodeCount.get();
|
||||||
case DEAD:
|
case DEAD:
|
||||||
return deadNodeCount.get();
|
return deadNodeCount.get();
|
||||||
|
case UNKNOWN:
|
||||||
|
// This is unknown due to the fact that some nodes can be in
|
||||||
|
// transit between the other states. Returning a count for that is not
|
||||||
|
// possible. The fact that we have such state is to deal with the fact
|
||||||
|
// that this information might not be consistent always.
|
||||||
|
return 0;
|
||||||
default:
|
default:
|
||||||
throw new IllegalArgumentException("Unknown node state requested.");
|
throw new IllegalArgumentException("Unknown node state requested.");
|
||||||
}
|
}
|
||||||
|
@ -392,6 +398,37 @@ public class SCMNodeManager
|
||||||
return lastHBcheckFinished != 0;
|
return lastHBcheckFinished != 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the node state of a specific node.
|
||||||
|
*
|
||||||
|
* @param id - DatanodeID
|
||||||
|
* @return Healthy/Stale/Dead/Unknown.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public NODESTATE getNodeState(DatanodeID id) {
|
||||||
|
// There is a subtle race condition here, hence we also support
|
||||||
|
// the NODEState.UNKNOWN. It is possible that just before we check the
|
||||||
|
// healthyNodes, we have removed the node from the healthy list but stil
|
||||||
|
// not added it to Stale Nodes list.
|
||||||
|
// We can fix that by adding the node to stale list before we remove, but
|
||||||
|
// then the node is in 2 states to avoid this race condition. Instead we
|
||||||
|
// just deal with the possibilty of getting a state called unknown.
|
||||||
|
|
||||||
|
if(healthyNodes.containsKey(id.getDatanodeUuid())) {
|
||||||
|
return NODESTATE.HEALTHY;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(staleNodes.containsKey(id.getDatanodeUuid())) {
|
||||||
|
return NODESTATE.STALE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(deadNodes.containsKey(id.getDatanodeUuid())) {
|
||||||
|
return NODESTATE.DEAD;
|
||||||
|
}
|
||||||
|
|
||||||
|
return NODESTATE.UNKNOWN;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is the real worker thread that processes the HB queue. We do the
|
* This is the real worker thread that processes the HB queue. We do the
|
||||||
* following things in this thread.
|
* following things in this thread.
|
||||||
|
|
|
@ -110,8 +110,9 @@ message ContainerReportsProto {
|
||||||
fullReport = 0;
|
fullReport = 0;
|
||||||
deltaReport = 1;
|
deltaReport = 1;
|
||||||
}
|
}
|
||||||
repeated ContainerInfo reports = 1;
|
required DatanodeIDProto datanodeID = 1;
|
||||||
required reportType type = 2;
|
repeated ContainerInfo reports = 2;
|
||||||
|
required reportType type = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,92 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.container.TestUtils;
|
||||||
|
|
||||||
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||||
|
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
||||||
|
import org.apache.hadoop.ozone.scm.node.NodePoolManager;
|
||||||
|
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class manages the state of datanode
|
||||||
|
* in conjunction with the node pool and node managers.
|
||||||
|
*/
|
||||||
|
public class ReplicationDatanodeStateManager {
|
||||||
|
private final NodeManager nodeManager;
|
||||||
|
private final NodePoolManager poolManager;
|
||||||
|
private final Random r;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The datanode state Manager.
|
||||||
|
*
|
||||||
|
* @param nodeManager
|
||||||
|
* @param poolManager
|
||||||
|
*/
|
||||||
|
public ReplicationDatanodeStateManager(NodeManager nodeManager,
|
||||||
|
NodePoolManager poolManager) {
|
||||||
|
this.nodeManager = nodeManager;
|
||||||
|
this.poolManager = poolManager;
|
||||||
|
r = new Random();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Container Report as if it is from a datanode in the cluster.
|
||||||
|
* @param containerName - Container Name.
|
||||||
|
* @param poolName - Pool Name.
|
||||||
|
* @param dataNodeCount - Datanode Count.
|
||||||
|
* @return List of Container Reports.
|
||||||
|
*/
|
||||||
|
public List<ContainerReportsProto> getContainerReport(String containerName,
|
||||||
|
String poolName, int dataNodeCount) {
|
||||||
|
List<ContainerReportsProto> containerList = new LinkedList<>();
|
||||||
|
List<DatanodeID> nodesInPool = poolManager.getNodes(poolName);
|
||||||
|
|
||||||
|
if (nodesInPool == null) {
|
||||||
|
return containerList;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nodesInPool.size() < dataNodeCount) {
|
||||||
|
throw new IllegalStateException("Not enough datanodes to create " +
|
||||||
|
"required container reports");
|
||||||
|
}
|
||||||
|
|
||||||
|
while (containerList.size() < dataNodeCount && nodesInPool.size() > 0) {
|
||||||
|
DatanodeID id = nodesInPool.get(r.nextInt(nodesInPool.size()));
|
||||||
|
nodesInPool.remove(id);
|
||||||
|
// We return container reports only for nodes that are healthy.
|
||||||
|
if (nodeManager.getNodeState(id) == NodeManager.NODESTATE.HEALTHY) {
|
||||||
|
ContainerInfo info = ContainerInfo.newBuilder()
|
||||||
|
.setContainerName(containerName)
|
||||||
|
.setFinalhash(DigestUtils.sha256Hex(containerName))
|
||||||
|
.build();
|
||||||
|
ContainerReportsProto containerReport = ContainerReportsProto
|
||||||
|
.newBuilder().addReports(info)
|
||||||
|
.setDatanodeID(id.getProtoBufMessage())
|
||||||
|
.setType(ContainerReportsProto.reportType.fullReport)
|
||||||
|
.build();
|
||||||
|
containerList.add(containerReport);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return containerList;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,315 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.container.TestUtils;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
||||||
|
import org.apache.hadoop.ozone.protocol.VersionResponse;
|
||||||
|
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
|
||||||
|
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
|
||||||
|
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
|
||||||
|
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Node Manager to test replication.
|
||||||
|
*/
|
||||||
|
public class ReplicationNodeManagerMock implements NodeManager {
|
||||||
|
private final Map<DatanodeID, NODESTATE> nodeStateMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A list of Datanodes and current states.
|
||||||
|
* @param nodeState A node state map.
|
||||||
|
*/
|
||||||
|
public ReplicationNodeManagerMock(Map<DatanodeID, NODESTATE> nodeState) {
|
||||||
|
Preconditions.checkNotNull(nodeState);
|
||||||
|
nodeStateMap = nodeState;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the minimum number of nodes to get out of chill mode.
|
||||||
|
*
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int getMinimumChillModeNodes() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reports if we have exited out of chill mode by discovering enough nodes.
|
||||||
|
*
|
||||||
|
* @return True if we are out of Node layer chill mode, false otherwise.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean isOutOfNodeChillMode() {
|
||||||
|
return !nodeStateMap.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a chill mode status string.
|
||||||
|
*
|
||||||
|
* @return String
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public String getChillModeStatus() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the status of manual chill mode flag.
|
||||||
|
*
|
||||||
|
* @return true if forceEnterChillMode has been called, false if
|
||||||
|
* forceExitChillMode or status is not set. eg. clearChillModeFlag.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean isInManualChillMode() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of data nodes that in all states.
|
||||||
|
*
|
||||||
|
* @return A state to number of nodes that in this state mapping
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Map<String, Integer> getNodeCount() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes a data node from the management of this Node Manager.
|
||||||
|
*
|
||||||
|
* @param node - DataNode.
|
||||||
|
* @throws UnregisteredNodeException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void removeNode(DatanodeID node) throws UnregisteredNodeException {
|
||||||
|
nodeStateMap.remove(node);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets all Live Datanodes that is currently communicating with SCM.
|
||||||
|
*
|
||||||
|
* @param nodestate - State of the node
|
||||||
|
* @return List of Datanodes that are Heartbeating SCM.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public List<DatanodeID> getNodes(NODESTATE nodestate) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the Number of Datanodes that are communicating with SCM.
|
||||||
|
*
|
||||||
|
* @param nodestate - State of the node
|
||||||
|
* @return int -- count
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int getNodeCount(NODESTATE nodestate) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all datanodes known to SCM.
|
||||||
|
*
|
||||||
|
* @return List of DatanodeIDs known to SCM.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public List<DatanodeID> getAllNodes() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Chill mode is the period when node manager waits for a minimum
|
||||||
|
* configured number of datanodes to report in. This is called chill mode
|
||||||
|
* to indicate the period before node manager gets into action.
|
||||||
|
* <p>
|
||||||
|
* Forcefully exits the chill mode, even if we have not met the minimum
|
||||||
|
* criteria of the nodes reporting in.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void forceExitChillMode() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Forcefully enters chill mode, even if all minimum node conditions are met.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void forceEnterChillMode() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clears the manual chill mode flag.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void clearChillModeFlag() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the aggregated node stats.
|
||||||
|
*
|
||||||
|
* @return the aggregated node stats.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public SCMNodeStat getStats() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a map of node stats.
|
||||||
|
*
|
||||||
|
* @return a map of individual node stats (live/stale but not dead).
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Map<String, SCMNodeStat> getNodeStats() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the node stat of the specified datanode.
|
||||||
|
*
|
||||||
|
* @param datanodeID - datanode ID.
|
||||||
|
* @return node stat if it is live/stale, null if it is dead or does't exist.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public SCMNodeMetric getNodeStat(DatanodeID datanodeID) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for the heartbeat is processed by NodeManager.
|
||||||
|
*
|
||||||
|
* @return true if heartbeat has been processed.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean waitForHeartbeatProcessed() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the node state of a specific node.
|
||||||
|
*
|
||||||
|
* @param id - DatanodeID
|
||||||
|
* @return Healthy/Stale/Dead.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public NODESTATE getNodeState(DatanodeID id) {
|
||||||
|
return nodeStateMap.get(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Closes this stream and releases any system resources associated
|
||||||
|
* with it. If the stream is already closed then invoking this
|
||||||
|
* method has no effect.
|
||||||
|
* <p>
|
||||||
|
* <p> As noted in {@link AutoCloseable#close()}, cases where the
|
||||||
|
* close may fail require careful attention. It is strongly advised
|
||||||
|
* to relinquish the underlying resources and to internally
|
||||||
|
* <em>mark</em> the {@code Closeable} as closed, prior to throwing
|
||||||
|
* the {@code IOException}.
|
||||||
|
*
|
||||||
|
* @throws IOException if an I/O error occurs
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When an object implementing interface <code>Runnable</code> is used
|
||||||
|
* to create a thread, starting the thread causes the object's
|
||||||
|
* <code>run</code> method to be called in that separately executing
|
||||||
|
* thread.
|
||||||
|
* <p>
|
||||||
|
* The general contract of the method <code>run</code> is that it may
|
||||||
|
* take any action whatsoever.
|
||||||
|
*
|
||||||
|
* @see Thread#run()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the version info from SCM.
|
||||||
|
*
|
||||||
|
* @param versionRequest - version Request.
|
||||||
|
* @return - returns SCM version info and other required information needed by
|
||||||
|
* datanode.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public VersionResponse getVersion(SCMVersionRequestProto versionRequest) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register the node if the node finds that it is not registered with any SCM.
|
||||||
|
*
|
||||||
|
* @param datanodeID - Send datanodeID with Node info, but datanode UUID is
|
||||||
|
* empty. Server returns a datanodeID for the given node.
|
||||||
|
* @return SCMHeartbeatResponseProto
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public SCMCommand register(DatanodeID datanodeID) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send heartbeat to indicate the datanode is alive and doing well.
|
||||||
|
*
|
||||||
|
* @param datanodeID - Datanode ID.
|
||||||
|
* @param nodeReport - node report.
|
||||||
|
* @return SCMheartbeat response list
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
|
||||||
|
SCMNodeReport nodeReport) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clears all nodes from the node Manager.
|
||||||
|
*/
|
||||||
|
public void clearMap() {
|
||||||
|
this.nodeStateMap.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds a node to the existing Node manager. This is used only for test
|
||||||
|
* purposes.
|
||||||
|
* @param id - DatanodeID
|
||||||
|
* @param state State you want to put that node to.
|
||||||
|
*/
|
||||||
|
public void addNode(DatanodeID id, NODESTATE state) {
|
||||||
|
nodeStateMap.put(id, state);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,132 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.container.TestUtils;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
|
||||||
|
import org.apache.hadoop.ozone.scm.node.NodePoolManager;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pool Manager replication mock.
|
||||||
|
*/
|
||||||
|
public class ReplicationNodePoolManagerMock implements NodePoolManager {
|
||||||
|
|
||||||
|
private final Map<DatanodeID, String> nodeMemberShip;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A node pool manager for testing.
|
||||||
|
*/
|
||||||
|
public ReplicationNodePoolManagerMock() {
|
||||||
|
nodeMemberShip = new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a node to a node pool.
|
||||||
|
*
|
||||||
|
* @param pool - name of the node pool.
|
||||||
|
* @param node - data node.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void addNode(String pool, DatanodeID node) {
|
||||||
|
nodeMemberShip.put(node, pool);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a node from a node pool.
|
||||||
|
*
|
||||||
|
* @param pool - name of the node pool.
|
||||||
|
* @param node - data node.
|
||||||
|
* @throws SCMException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void removeNode(String pool, DatanodeID node) throws SCMException {
|
||||||
|
nodeMemberShip.remove(node);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a list of known node pools.
|
||||||
|
*
|
||||||
|
* @return a list of known node pool names or an empty list if not node pool
|
||||||
|
* is defined.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public List<String> getNodePools() {
|
||||||
|
Set<String> poolSet = new HashSet<>();
|
||||||
|
for (Map.Entry<DatanodeID, String> entry : nodeMemberShip.entrySet()) {
|
||||||
|
poolSet.add(entry.getValue());
|
||||||
|
}
|
||||||
|
return new ArrayList<>(poolSet);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all nodes of a node pool given the name of the node pool.
|
||||||
|
*
|
||||||
|
* @param pool - name of the node pool.
|
||||||
|
* @return a list of datanode ids or an empty list if the node pool was not
|
||||||
|
* found.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public List<DatanodeID> getNodes(String pool) {
|
||||||
|
Set<DatanodeID> datanodeSet = new HashSet<>();
|
||||||
|
for (Map.Entry<DatanodeID, String> entry : nodeMemberShip.entrySet()) {
|
||||||
|
if (entry.getValue().equals(pool)) {
|
||||||
|
datanodeSet.add(entry.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new ArrayList<>(datanodeSet);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the node pool name if the node has been added to a node pool.
|
||||||
|
*
|
||||||
|
* @param datanodeID - datanode ID.
|
||||||
|
* @return node pool name if it has been assigned. null if the node has not
|
||||||
|
* been assigned to any node pool yet.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public String getNodePool(DatanodeID datanodeID) {
|
||||||
|
return nodeMemberShip.get(datanodeID);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Closes this stream and releases any system resources associated
|
||||||
|
* with it. If the stream is already closed then invoking this
|
||||||
|
* method has no effect.
|
||||||
|
* <p>
|
||||||
|
* <p> As noted in {@link AutoCloseable#close()}, cases where the
|
||||||
|
* close may fail require careful attention. It is strongly advised
|
||||||
|
* to relinquish the underlying resources and to internally
|
||||||
|
* <em>mark</em> the {@code Closeable} as closed, prior to throwing
|
||||||
|
* the {@code IOException}.
|
||||||
|
*
|
||||||
|
* @throws IOException if an I/O error occurs
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.container.TestUtils;
|
||||||
|
// Helper classes for ozone and container tests.
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
|
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
|
||||||
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
||||||
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
|
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
|
||||||
|
@ -138,8 +139,7 @@ public final class SCMTestUtils {
|
||||||
try (ServerSocket socket = new ServerSocket(0)) {
|
try (ServerSocket socket = new ServerSocket(0)) {
|
||||||
socket.setReuseAddress(true);
|
socket.setReuseAddress(true);
|
||||||
int port = socket.getLocalPort();
|
int port = socket.getLocalPort();
|
||||||
String addr = InetAddress.getLoopbackAddress().getHostAddress()
|
String addr = InetAddress.getLoopbackAddress().getHostAddress();
|
||||||
.toString();
|
|
||||||
return new InetSocketAddress(addr, port);
|
return new InetSocketAddress(addr, port);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -148,6 +148,10 @@ public final class SCMTestUtils {
|
||||||
return new Configuration();
|
return new Configuration();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static OzoneConfiguration getOzoneConf() {
|
||||||
|
return new OzoneConfiguration();
|
||||||
|
}
|
||||||
|
|
||||||
public static DatanodeID getDatanodeID(SCMNodeManager nodeManager) {
|
public static DatanodeID getDatanodeID(SCMNodeManager nodeManager) {
|
||||||
|
|
||||||
return getDatanodeID(nodeManager, UUID.randomUUID().toString());
|
return getDatanodeID(nodeManager, UUID.randomUUID().toString());
|
||||||
|
|
|
@ -61,6 +61,8 @@ import java.net.InetSocketAddress;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||||
|
import static org.apache.hadoop.ozone.container.common.SCMTestUtils
|
||||||
|
.getDatanodeID;
|
||||||
import static org.apache.hadoop.ozone.protocol.proto
|
import static org.apache.hadoop.ozone.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.ReportState.states
|
.StorageContainerDatanodeProtocolProtos.ReportState.states
|
||||||
.noContainerReports;
|
.noContainerReports;
|
||||||
|
@ -193,7 +195,7 @@ public class TestEndPoint {
|
||||||
public void testRegister() throws Exception {
|
public void testRegister() throws Exception {
|
||||||
String[] scmAddressArray = new String[1];
|
String[] scmAddressArray = new String[1];
|
||||||
scmAddressArray[0] = serverAddress.toString();
|
scmAddressArray[0] = serverAddress.toString();
|
||||||
DatanodeID nodeToRegister = SCMTestUtils.getDatanodeID();
|
DatanodeID nodeToRegister = getDatanodeID();
|
||||||
try (EndpointStateMachine rpcEndPoint =
|
try (EndpointStateMachine rpcEndPoint =
|
||||||
SCMTestUtils.createEndpoint(
|
SCMTestUtils.createEndpoint(
|
||||||
SCMTestUtils.getConf(), serverAddress, 1000)) {
|
SCMTestUtils.getConf(), serverAddress, 1000)) {
|
||||||
|
@ -218,7 +220,7 @@ public class TestEndPoint {
|
||||||
if (!clearContainerID) {
|
if (!clearContainerID) {
|
||||||
ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
|
ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
|
||||||
.setClusterID(UUID.randomUUID().toString())
|
.setClusterID(UUID.randomUUID().toString())
|
||||||
.setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage())
|
.setDatanodeID(getDatanodeID().getProtoBufMessage())
|
||||||
.build();
|
.build();
|
||||||
endpointTask.setContainerNodeIDProto(containerNodeID);
|
endpointTask.setContainerNodeIDProto(containerNodeID);
|
||||||
}
|
}
|
||||||
|
@ -272,7 +274,7 @@ public class TestEndPoint {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHeartbeat() throws Exception {
|
public void testHeartbeat() throws Exception {
|
||||||
DatanodeID dataNode = SCMTestUtils.getDatanodeID();
|
DatanodeID dataNode = getDatanodeID();
|
||||||
try (EndpointStateMachine rpcEndPoint =
|
try (EndpointStateMachine rpcEndPoint =
|
||||||
SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
|
SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
|
||||||
serverAddress, 1000)) {
|
serverAddress, 1000)) {
|
||||||
|
@ -299,7 +301,7 @@ public class TestEndPoint {
|
||||||
scmAddress, rpcTimeout)) {
|
scmAddress, rpcTimeout)) {
|
||||||
ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
|
ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
|
||||||
.setClusterID(UUID.randomUUID().toString())
|
.setClusterID(UUID.randomUUID().toString())
|
||||||
.setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage())
|
.setDatanodeID(getDatanodeID().getProtoBufMessage())
|
||||||
.build();
|
.build();
|
||||||
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT);
|
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT);
|
||||||
|
|
||||||
|
@ -365,6 +367,8 @@ public class TestEndPoint {
|
||||||
reportsBuilder.addReports(getRandomContainerReport()
|
reportsBuilder.addReports(getRandomContainerReport()
|
||||||
.getProtoBufMessage());
|
.getProtoBufMessage());
|
||||||
}
|
}
|
||||||
|
reportsBuilder.setDatanodeID(SCMTestUtils.getDatanodeID()
|
||||||
|
.getProtoBufMessage());
|
||||||
reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
|
reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
|
||||||
.ContainerReportsProto.reportType.fullReport);
|
.ContainerReportsProto.reportType.fullReport);
|
||||||
return reportsBuilder.build();
|
return reportsBuilder.build();
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.impl;
|
||||||
|
|
||||||
import org.apache.commons.codec.binary.Hex;
|
import org.apache.commons.codec.binary.Hex;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||||
|
@ -354,7 +355,7 @@ public class TestContainerPersistence {
|
||||||
pipeline.setContainerName(containerName);
|
pipeline.setContainerName(containerName);
|
||||||
ContainerData cData = new ContainerData(containerName);
|
ContainerData cData = new ContainerData(containerName);
|
||||||
cData.addMetadata("VOLUME", "shire");
|
cData.addMetadata("VOLUME", "shire");
|
||||||
cData.addMetadata("owner)", "bilbo");
|
cData.addMetadata("owner", "bilbo");
|
||||||
if(!containerManager.getContainerMap()
|
if(!containerManager.getContainerMap()
|
||||||
.containsKey(containerName)) {
|
.containsKey(containerName)) {
|
||||||
containerManager.createContainer(pipeline, cData);
|
containerManager.createContainer(pipeline, cData);
|
||||||
|
@ -773,7 +774,7 @@ public class TestContainerPersistence {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testListKey() throws Exception {
|
public void testListKey() throws Exception {
|
||||||
String containerName = "c-0";
|
String containerName = "c0" + RandomStringUtils.randomAscii(10);
|
||||||
Pipeline pipeline = createSingleNodePipeline(containerName);
|
Pipeline pipeline = createSingleNodePipeline(containerName);
|
||||||
List<String> expectedKeys = new ArrayList<String>();
|
List<String> expectedKeys = new ArrayList<String>();
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
|
|
|
@ -0,0 +1,259 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.container.replication;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.ozone.container.TestUtils
|
||||||
|
.ReplicationDatanodeStateManager;
|
||||||
|
import org.apache.hadoop.ozone.container.TestUtils.ReplicationNodeManagerMock;
|
||||||
|
import org.apache.hadoop.ozone.container.TestUtils
|
||||||
|
.ReplicationNodePoolManagerMock;
|
||||||
|
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||||
|
import org.apache.hadoop.ozone.scm.container.replication
|
||||||
|
.ContainerReplicationManager;
|
||||||
|
import org.apache.hadoop.ozone.scm.container.replication.InProgressPool;
|
||||||
|
import org.apache.hadoop.ozone.scm.node.CommandQueue;
|
||||||
|
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
||||||
|
import org.apache.hadoop.ozone.scm.node.NodePoolManager;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||||
|
.OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_SECONDS;
|
||||||
|
import static org.apache.ratis.shaded.com.google.common.util.concurrent
|
||||||
|
.Uninterruptibles.sleepUninterruptibly;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for the container manager.
|
||||||
|
*/
|
||||||
|
public class TestContainerReplicationManager {
|
||||||
|
final static String POOL_NAME_TEMPLATE = "Pool%d";
|
||||||
|
static final int MAX_DATANODES = 72;
|
||||||
|
static final int POOL_SIZE = 24;
|
||||||
|
static final int POOL_COUNT = 3;
|
||||||
|
private LogCapturer logCapturer = LogCapturer.captureLogs(
|
||||||
|
LogFactory.getLog(ContainerReplicationManager.class));
|
||||||
|
private List<DatanodeID> datanodes = new LinkedList<>();
|
||||||
|
private NodeManager nodeManager;
|
||||||
|
private NodePoolManager poolManager;
|
||||||
|
private CommandQueue commandQueue;
|
||||||
|
private ContainerReplicationManager replicationManager;
|
||||||
|
private ReplicationDatanodeStateManager datanodeStateManager;
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
logCapturer.stopCapturing();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
Map<DatanodeID, NodeManager.NODESTATE> nodeStateMap = new HashMap<>();
|
||||||
|
// We are setting up 3 pools with 24 nodes each in this cluster.
|
||||||
|
// First we create 72 Datanodes.
|
||||||
|
for (int x = 0; x < MAX_DATANODES; x++) {
|
||||||
|
DatanodeID datanode = SCMTestUtils.getDatanodeID();
|
||||||
|
datanodes.add(datanode);
|
||||||
|
nodeStateMap.put(datanode, NodeManager.NODESTATE.HEALTHY);
|
||||||
|
}
|
||||||
|
|
||||||
|
// All nodes in this cluster are healthy for time being.
|
||||||
|
nodeManager = new ReplicationNodeManagerMock(nodeStateMap);
|
||||||
|
poolManager = new ReplicationNodePoolManagerMock();
|
||||||
|
commandQueue = new CommandQueue();
|
||||||
|
|
||||||
|
Assert.assertEquals("Max datanodes should be equal to POOL_SIZE * " +
|
||||||
|
"POOL_COUNT", POOL_COUNT * POOL_SIZE, MAX_DATANODES);
|
||||||
|
|
||||||
|
// Start from 1 instead of zero so we can multiply and get the node index.
|
||||||
|
for (int y = 1; y <= POOL_COUNT; y++) {
|
||||||
|
String poolName = String.format(POOL_NAME_TEMPLATE, y);
|
||||||
|
for (int z = 0; z < POOL_SIZE; z++) {
|
||||||
|
DatanodeID id = datanodes.get(y * z);
|
||||||
|
poolManager.addNode(poolName, id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
OzoneConfiguration config = SCMTestUtils.getOzoneConf();
|
||||||
|
config.setInt(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_SECONDS, 1);
|
||||||
|
replicationManager = new ContainerReplicationManager(config,
|
||||||
|
nodeManager, poolManager, commandQueue);
|
||||||
|
datanodeStateManager = new ReplicationDatanodeStateManager(nodeManager,
|
||||||
|
poolManager);
|
||||||
|
// Sleep for one second to make sure all threads get time to run.
|
||||||
|
sleepUninterruptibly(1, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
/**
|
||||||
|
* Asserts that at least one pool is picked up for processing.
|
||||||
|
*/
|
||||||
|
public void testAssertPoolsAreProcessed() {
|
||||||
|
// This asserts that replication manager has started processing at least
|
||||||
|
// one pool.
|
||||||
|
Assert.assertTrue(replicationManager.getInProgressPoolCount() > 0);
|
||||||
|
|
||||||
|
// Since all datanodes are flagged as healthy in this test, for each
|
||||||
|
// datanode we must have queued a command.
|
||||||
|
Assert.assertEquals("Commands are in queue :", commandQueue
|
||||||
|
.getCommandsInQueue(), POOL_SIZE * replicationManager
|
||||||
|
.getInProgressPoolCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
/**
|
||||||
|
* This test sends container reports for 2 containers to a pool in progress.
|
||||||
|
* Asserts that we are able to find a container with single replica and do
|
||||||
|
* not find container with 3 replicas.
|
||||||
|
*/
|
||||||
|
public void testDetectSingleContainerReplica() throws TimeoutException,
|
||||||
|
InterruptedException {
|
||||||
|
String singleNodeContainer = "SingleNodeContainer";
|
||||||
|
String threeNodeContainer = "ThreeNodeContainer";
|
||||||
|
InProgressPool ppool = replicationManager.getInProcessPoolList().get(0);
|
||||||
|
// Only single datanode reporting that "SingleNodeContainer" exists.
|
||||||
|
List<ContainerReportsProto> clist =
|
||||||
|
datanodeStateManager.getContainerReport(singleNodeContainer,
|
||||||
|
ppool.getPool().getPoolName(), 1);
|
||||||
|
ppool.handleContainerReport(clist.get(0));
|
||||||
|
|
||||||
|
// Three nodes are going to report that ThreeNodeContainer exists.
|
||||||
|
clist = datanodeStateManager.getContainerReport(threeNodeContainer,
|
||||||
|
ppool.getPool().getPoolName(), 3);
|
||||||
|
|
||||||
|
for (ContainerReportsProto reportsProto : clist) {
|
||||||
|
ppool.handleContainerReport(reportsProto);
|
||||||
|
}
|
||||||
|
GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() == 4,
|
||||||
|
200, 1000);
|
||||||
|
ppool.setDoneProcessing();
|
||||||
|
|
||||||
|
List<Map.Entry<String, Integer>> containers = ppool.filterContainer(p -> p
|
||||||
|
.getValue() == 1);
|
||||||
|
Assert.assertEquals(singleNodeContainer, containers.get(0).getKey());
|
||||||
|
int count = containers.get(0).getValue();
|
||||||
|
Assert.assertEquals(count, 1L);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
/**
|
||||||
|
* We create three containers, Normal,OveReplicated and WayOverReplicated
|
||||||
|
* containers. This test asserts that we are able to find the
|
||||||
|
* over replicated containers.
|
||||||
|
*/
|
||||||
|
public void testDetectOverReplica() throws TimeoutException,
|
||||||
|
InterruptedException {
|
||||||
|
String normalContainer = "NormalContainer";
|
||||||
|
String overReplicated = "OverReplicatedContainer";
|
||||||
|
String wayOverReplicated = "WayOverReplicated";
|
||||||
|
InProgressPool ppool = replicationManager.getInProcessPoolList().get(0);
|
||||||
|
|
||||||
|
List<ContainerReportsProto> clist =
|
||||||
|
datanodeStateManager.getContainerReport(normalContainer,
|
||||||
|
ppool.getPool().getPoolName(), 3);
|
||||||
|
ppool.handleContainerReport(clist.get(0));
|
||||||
|
|
||||||
|
clist = datanodeStateManager.getContainerReport(overReplicated,
|
||||||
|
ppool.getPool().getPoolName(), 4);
|
||||||
|
|
||||||
|
for (ContainerReportsProto reportsProto : clist) {
|
||||||
|
ppool.handleContainerReport(reportsProto);
|
||||||
|
}
|
||||||
|
|
||||||
|
clist = datanodeStateManager.getContainerReport(wayOverReplicated,
|
||||||
|
ppool.getPool().getPoolName(), 7);
|
||||||
|
|
||||||
|
for (ContainerReportsProto reportsProto : clist) {
|
||||||
|
ppool.handleContainerReport(reportsProto);
|
||||||
|
}
|
||||||
|
|
||||||
|
// We ignore container reports from the same datanodes.
|
||||||
|
// it is possible that these each of these containers get placed
|
||||||
|
// on same datanodes, so allowing for 4 duplicates in the set of 14.
|
||||||
|
GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() > 10,
|
||||||
|
200, 1000);
|
||||||
|
ppool.setDoneProcessing();
|
||||||
|
|
||||||
|
List<Map.Entry<String, Integer>> containers = ppool.filterContainer(p -> p
|
||||||
|
.getValue() > 3);
|
||||||
|
Assert.assertEquals(2, containers.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
/**
|
||||||
|
* This test verifies that all pools are picked up for replica processing.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public void testAllPoolsAreProcessed() throws TimeoutException,
|
||||||
|
InterruptedException {
|
||||||
|
// Verify that we saw all three pools being picked up for processing.
|
||||||
|
GenericTestUtils.waitFor(() -> replicationManager.getPoolProcessCount()
|
||||||
|
>= 3, 200, 15 * 1000);
|
||||||
|
Assert.assertTrue(logCapturer.getOutput().contains("Pool1") &&
|
||||||
|
logCapturer.getOutput().contains("Pool2") &&
|
||||||
|
logCapturer.getOutput().contains("Pool3"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
/**
|
||||||
|
* Adds a new pool and tests that we are able to pick up that new pool for
|
||||||
|
* processing as well as handle container reports for datanodes in that pool.
|
||||||
|
* @throws TimeoutException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
public void testAddingNewPoolWorks() throws TimeoutException,
|
||||||
|
InterruptedException {
|
||||||
|
LogCapturer inProgressLog = LogCapturer.captureLogs(
|
||||||
|
LogFactory.getLog(InProgressPool.class));
|
||||||
|
GenericTestUtils.setLogLevel(InProgressPool.LOG, Level.ALL);
|
||||||
|
try {
|
||||||
|
DatanodeID id = SCMTestUtils.getDatanodeID();
|
||||||
|
((ReplicationNodeManagerMock) (nodeManager)).addNode(id, NodeManager
|
||||||
|
.NODESTATE.HEALTHY);
|
||||||
|
poolManager.addNode("PoolNew", id);
|
||||||
|
GenericTestUtils.waitFor(() ->
|
||||||
|
logCapturer.getOutput().contains("PoolNew"),
|
||||||
|
200, 15 * 1000);
|
||||||
|
|
||||||
|
// Assert that we are able to send a container report to this new
|
||||||
|
// pool and datanode.
|
||||||
|
List<ContainerReportsProto> clist =
|
||||||
|
datanodeStateManager.getContainerReport("NewContainer1",
|
||||||
|
"PoolNew", 1);
|
||||||
|
replicationManager.handleContainerReport(clist.get(0));
|
||||||
|
GenericTestUtils.waitFor(() ->
|
||||||
|
inProgressLog.getOutput().contains("NewContainer1") && inProgressLog
|
||||||
|
.getOutput().contains(id.getDatanodeUuid()), 200, 10 * 1000);
|
||||||
|
} finally {
|
||||||
|
inProgressLog.stopCapturing();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.container.replication;
|
||||||
|
// Test classes for replication.
|
|
@ -39,6 +39,7 @@ import java.io.IOException;
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import javax.management.openmbean.CompositeData;
|
import javax.management.openmbean.CompositeData;
|
||||||
import javax.management.openmbean.TabularData;
|
import javax.management.openmbean.TabularData;
|
||||||
|
@ -53,12 +54,14 @@ public class TestSCMMXBean {
|
||||||
private static MBeanServer mbs;
|
private static MBeanServer mbs;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void init() throws IOException {
|
public static void init() throws IOException, TimeoutException,
|
||||||
|
InterruptedException {
|
||||||
conf = new OzoneConfiguration();
|
conf = new OzoneConfiguration();
|
||||||
cluster = new MiniOzoneCluster.Builder(conf)
|
cluster = new MiniOzoneCluster.Builder(conf)
|
||||||
.numDataNodes(numOfDatanodes)
|
.numDataNodes(numOfDatanodes)
|
||||||
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
|
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
|
||||||
.build();
|
.build();
|
||||||
|
cluster.waitOzoneReady();
|
||||||
scm = cluster.getStorageContainerManager();
|
scm = cluster.getStorageContainerManager();
|
||||||
mbs = ManagementFactory.getPlatformMBeanServer();
|
mbs = ManagementFactory.getPlatformMBeanServer();
|
||||||
}
|
}
|
||||||
|
|
|
@ -251,6 +251,17 @@ public class MockNodeManager implements NodeManager {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the node state of a specific node.
|
||||||
|
*
|
||||||
|
* @param id - DatanodeID
|
||||||
|
* @return Healthy/Stale/Dead.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public NODESTATE getNodeState(DatanodeID id) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Closes this stream and releases any system resources associated with it. If
|
* Closes this stream and releases any system resources associated with it. If
|
||||||
* the stream is already closed then invoking this method has no effect.
|
* the stream is already closed then invoking this method has no effect.
|
||||||
|
|
Loading…
Reference in New Issue