From f854a89190bd2453ccb1bfaa123d63d546e913cd Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Fri, 22 Mar 2019 14:36:29 -0700 Subject: [PATCH] HDDS-1205. Refactor ReplicationManager to handle QUASI_CLOSED containers. Contributed by Nanda kumar. (#620) --- .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 12 + .../src/main/resources/ozone-default.xml | 20 + .../hdds/scm/container/ContainerManager.java | 8 + .../scm/container/ReplicationManager.java | 748 ++++++++++++++++++ .../scm/container/SCMContainerManager.java | 10 + .../container/states/ContainerStateMap.java | 9 +- .../org/apache/hadoop/hdds/scm/TestUtils.java | 53 +- .../container/TestContainerReportHandler.java | 8 +- .../container/TestContainerReportHelper.java | 40 - ...TestIncrementalContainerReportHandler.java | 6 +- .../scm/container/TestReplicationManager.java | 625 +++++++++++++++ 11 files changed, 1485 insertions(+), 54 deletions(-) create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java create mode 100644 hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 4e197d354f7..3b45b89dc8b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -348,6 +348,18 @@ public final class ScmConfigKeys { public static final String HDDS_SCM_WATCHER_TIMEOUT_DEFAULT = "10m"; + public static final String HDDS_SCM_REPLICATION_THREAD_INTERVAL = + "hdds.scm.replication.thread.interval"; + + public static final String HDDS_SCM_REPLICATION_THREAD_INTERVAL_DEFAULT = + "5m"; + + public static final String HDDS_SCM_REPLICATION_EVENT_TIMEOUT = + "hdds.scm.replication.event.timeout"; + + public static final String HDDS_SCM_REPLICATION_EVENT_TIMEOUT_DEFAULT = + "10m"; + public static final String HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY = "hdds.scm.http.kerberos.principal"; diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 462a07bf595..9fd4ef3b70a 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -2357,4 +2357,24 @@ Request to flush the OM DB before taking checkpoint snapshot. + + hdds.scm.replication.thread.interval + 5m + OZONE, SCM + + There is a replication monitor thread running inside SCM which + takes care of replicating the containers in the cluster. This + property is used to configure the interval in which that thread + runs. + + + + hdds.scm.replication.event.timeout + 10m + OZONE, SCM + + Timeout for the container replication/deletion commands sent + to datanodes. After this timeout the command will be retried. + + diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java index b2fe4b46b3b..717d58ddeb0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java @@ -34,6 +34,14 @@ import java.util.Set; */ public interface ContainerManager extends Closeable { + + /** + * Returns all the container Ids managed by ContainerManager. + * + * @return Set of ContainerID + */ + Set getContainerIDs(); + /** * Returns all the containers managed by ContainerManager. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java new file mode 100644 index 00000000000..97c600b4112 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -0,0 +1,748 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.GeneratedMessage; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.lock.LockManager; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.Time; +import org.apache.ratis.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * Replication Manager (RM) is the one which is responsible for making sure + * that the containers are properly replicated. Replication Manager deals only + * with Quasi Closed / Closed container. + */ +public class ReplicationManager { + + private static final Logger LOG = + LoggerFactory.getLogger(ReplicationManager.class); + + /** + * Reference to the ContainerManager. + */ + private final ContainerManager containerManager; + + /** + * PlacementPolicy which is used to identify where a container + * should be replicated. + */ + private final ContainerPlacementPolicy containerPlacement; + + /** + * EventPublisher to fire Replicate and Delete container events. + */ + private final EventPublisher eventPublisher; + + /** + * Used for locking a container using its ID while processing it. + */ + private final LockManager lockManager; + + /** + * This is used for tracking container replication commands which are issued + * by ReplicationManager and not yet complete. + */ + private final Map> inflightReplication; + + /** + * This is used for tracking container deletion commands which are issued + * by ReplicationManager and not yet complete. + */ + private final Map> inflightDeletion; + + /** + * ReplicationMonitor thread is the one which wakes up at configured + * interval and processes all the containers. + */ + private final Thread replicationMonitor; + + /** + * The frequency in which ReplicationMonitor thread should run. + */ + private final long interval; + + /** + * Timeout for container replication & deletion command issued by + * ReplicationManager. + */ + private final long eventTimeout; + + /** + * Flag used for checking if the ReplicationMonitor thread is running or + * not. + */ + private volatile boolean running; + + /** + * Constructs ReplicationManager instance with the given configuration. + * + * @param conf OzoneConfiguration + * @param containerManager ContainerManager + * @param containerPlacement ContainerPlacementPolicy + * @param eventPublisher EventPublisher + */ + public ReplicationManager(final Configuration conf, + final ContainerManager containerManager, + final ContainerPlacementPolicy containerPlacement, + final EventPublisher eventPublisher) { + this.containerManager = containerManager; + this.containerPlacement = containerPlacement; + this.eventPublisher = eventPublisher; + this.lockManager = new LockManager<>(conf); + this.inflightReplication = new HashMap<>(); + this.inflightDeletion = new HashMap<>(); + this.replicationMonitor = new Thread(this::run); + this.replicationMonitor.setName("ReplicationMonitor"); + this.replicationMonitor.setDaemon(true); + this.interval = conf.getTimeDuration( + ScmConfigKeys.HDDS_SCM_REPLICATION_THREAD_INTERVAL, + ScmConfigKeys.HDDS_SCM_REPLICATION_THREAD_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + this.eventTimeout = conf.getTimeDuration( + ScmConfigKeys.HDDS_SCM_REPLICATION_EVENT_TIMEOUT, + ScmConfigKeys.HDDS_SCM_REPLICATION_EVENT_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + this.running = false; + } + + /** + * Starts Replication Monitor thread. + */ + public synchronized void start() { + if (!running) { + LOG.info("Starting Replication Monitor Thread."); + running = true; + replicationMonitor.start(); + } else { + LOG.info("Replication Monitor Thread is already running."); + } + } + + /** + * Process all the containers immediately. + */ + @VisibleForTesting + @SuppressFBWarnings(value="NN_NAKED_NOTIFY", + justification="Used only for testing") + synchronized void processContainersNow() { + notify(); + } + + /** + * Stops Replication Monitor thread. + */ + public synchronized void stop() { + if (running) { + LOG.info("Stopping Replication Monitor Thread."); + running = false; + notify(); + } else { + LOG.info("Replication Monitor Thread is not running."); + } + } + + /** + * ReplicationMonitor thread runnable. This wakes up at configured + * interval and processes all the containers in the system. + */ + private synchronized void run() { + try { + while (running) { + final long start = Time.monotonicNow(); + final Set containerIds = + containerManager.getContainerIDs(); + containerIds.forEach(this::processContainer); + + LOG.info("Replication Monitor Thread took {} milliseconds for" + + " processing {} containers.", Time.monotonicNow() - start, + containerIds.size()); + + wait(interval); + } + } catch (Throwable t) { + // When we get runtime exception, we should terminate SCM. + LOG.error("Exception in Replication Monitor Thread.", t); + ExitUtil.terminate(1, t); + } + } + + /** + * Process the given container. + * + * @param id ContainerID + */ + private void processContainer(ContainerID id) { + lockManager.lock(id); + try { + final ContainerInfo container = containerManager.getContainer(id); + final Set replicas = containerManager + .getContainerReplicas(container.containerID()); + final LifeCycleState state = container.getState(); + + /* + * We don't take any action if the container is in OPEN state. + */ + if (state == LifeCycleState.OPEN) { + return; + } + + /* + * If the container is in CLOSING state, the replicas can either + * be in OPEN or in CLOSING state. In both of this cases + * we have to resend close container command to the datanodes. + */ + if (state == LifeCycleState.CLOSING) { + replicas.forEach(replica -> sendCloseCommand( + container, replica.getDatanodeDetails(), false)); + return; + } + + /* + * If the container is in QUASI_CLOSED state, check and close the + * container if possible. + */ + if (state == LifeCycleState.QUASI_CLOSED && + canForceCloseContainer(container, replicas)) { + forceCloseContainer(container, replicas); + return; + } + + /* + * Before processing the container we have to reconcile the + * inflightReplication and inflightDeletion actions. + * + * We remove the entry from inflightReplication and inflightDeletion + * list, if the operation is completed or if it has timed out. + */ + updateInflightAction(container, inflightReplication, + action -> replicas.stream() + .anyMatch(r -> r.getDatanodeDetails().equals(action.datanode))); + + updateInflightAction(container, inflightDeletion, + action -> replicas.stream() + .noneMatch(r -> r.getDatanodeDetails().equals(action.datanode))); + + + /* + * We don't have to take any action if the container is healthy. + * + * According to ReplicationMonitor container is considered healthy if + * the container is either in QUASI_CLOSED or in CLOSED state and has + * exact number of replicas in the same state. + */ + if (isContainerHealthy(container, replicas)) { + return; + } + + /* + * Check if the container is under replicated and take appropriate + * action. + */ + if (isContainerUnderReplicated(container, replicas)) { + handleUnderReplicatedContainer(container, replicas); + return; + } + + /* + * Check if the container is over replicated and take appropriate + * action. + */ + if (isContainerOverReplicated(container, replicas)) { + handleOverReplicatedContainer(container, replicas); + return; + } + + /* + * The container is neither under nor over replicated and the container + * is not healthy. This means that the container has unhealthy/corrupted + * replica. + */ + handleUnstableContainer(container, replicas); + + } catch (ContainerNotFoundException ex) { + LOG.warn("Missing container {}.", id); + } finally { + lockManager.unlock(id); + } + } + + /** + * Reconciles the InflightActions for a given container. + * + * @param container Container to update + * @param inflightActions inflightReplication (or) inflightDeletion + * @param filter filter to check if the operation is completed + */ + private void updateInflightAction(final ContainerInfo container, + final Map> inflightActions, + final Predicate filter) { + final ContainerID id = container.containerID(); + final long deadline = Time.monotonicNow() - eventTimeout; + if (inflightActions.containsKey(id)) { + final List actions = inflightActions.get(id); + actions.removeIf(action -> action.time < deadline); + actions.removeIf(filter); + if (actions.isEmpty()) { + inflightActions.remove(id); + } + } + } + + /** + * Returns true if the container is healthy according to ReplicationMonitor. + * + * According to ReplicationMonitor container is considered healthy if + * it has exact number of replicas in the same state as the container. + * + * @param container Container to check + * @param replicas Set of ContainerReplicas + * @return true if the container is healthy, false otherwise + */ + private boolean isContainerHealthy(final ContainerInfo container, + final Set replicas) { + return container.getReplicationFactor().getNumber() == replicas.size() && + replicas.stream().allMatch( + r -> compareState(container.getState(), r.getState())); + } + + /** + * Checks if the container is under replicated or not. + * + * @param container Container to check + * @param replicas Set of ContainerReplicas + * @return true if the container is under replicated, false otherwise + */ + private boolean isContainerUnderReplicated(final ContainerInfo container, + final Set replicas) { + return container.getReplicationFactor().getNumber() > + getReplicaCount(container.containerID(), replicas); + } + + /** + * Checks if the container is over replicated or not. + * + * @param container Container to check + * @param replicas Set of ContainerReplicas + * @return true if the container if over replicated, false otherwise + */ + private boolean isContainerOverReplicated(final ContainerInfo container, + final Set replicas) { + return container.getReplicationFactor().getNumber() < + getReplicaCount(container.containerID(), replicas); + } + + /** + * Returns the replication count of the given container. This also + * considers inflight replication and deletion. + * + * @param id ContainerID + * @param replicas Set of existing replicas + * @return number of estimated replicas for this container + */ + private int getReplicaCount(final ContainerID id, + final Set replicas) { + return replicas.size() + + inflightReplication.getOrDefault(id, Collections.emptyList()).size() + - inflightDeletion.getOrDefault(id, Collections.emptyList()).size(); + } + + /** + * Returns true if more than 50% of the container replicas with unique + * originNodeId are in QUASI_CLOSED state. + * + * @param container Container to check + * @param replicas Set of ContainerReplicas + * @return true if we can force close the container, false otherwise + */ + private boolean canForceCloseContainer(final ContainerInfo container, + final Set replicas) { + Preconditions.assertTrue(container.getState() == + LifeCycleState.QUASI_CLOSED); + final int replicationFactor = container.getReplicationFactor().getNumber(); + final long uniqueQuasiClosedReplicaCount = replicas.stream() + .filter(r -> r.getState() == State.QUASI_CLOSED) + .map(ContainerReplica::getOriginDatanodeId) + .distinct() + .count(); + return uniqueQuasiClosedReplicaCount > (replicationFactor / 2); + } + + /** + * Force close the container replica(s) with highest sequence Id. + * + *

+ * Note: We should force close the container only if >50% (quorum) + * of replicas with unique originNodeId are in QUASI_CLOSED state. + *

+ * + * @param container ContainerInfo + * @param replicas Set of ContainerReplicas + */ + private void forceCloseContainer(final ContainerInfo container, + final Set replicas) { + Preconditions.assertTrue(container.getState() == + LifeCycleState.QUASI_CLOSED); + + final List quasiClosedReplicas = replicas.stream() + .filter(r -> r.getState() == State.QUASI_CLOSED) + .collect(Collectors.toList()); + + final Long sequenceId = quasiClosedReplicas.stream() + .map(ContainerReplica::getSequenceId) + .max(Long::compare) + .orElse(-1L); + + LOG.info("Force closing container {} with BCSID {}," + + " which is in QUASI_CLOSED state.", + container.containerID(), sequenceId); + + quasiClosedReplicas.stream() + .filter(r -> sequenceId != -1L) + .filter(replica -> replica.getSequenceId().equals(sequenceId)) + .forEach(replica -> sendCloseCommand( + container, replica.getDatanodeDetails(), true)); + } + + /** + * If the given container is under replicated, identify a new set of + * datanode(s) to replicate the container using ContainerPlacementPolicy + * and send replicate container command to the identified datanode(s). + * + * @param container ContainerInfo + * @param replicas Set of ContainerReplicas + */ + private void handleUnderReplicatedContainer(final ContainerInfo container, + final Set replicas) { + try { + final ContainerID id = container.containerID(); + final List deletionInFlight = inflightDeletion + .getOrDefault(id, Collections.emptyList()) + .stream() + .map(action -> action.datanode) + .collect(Collectors.toList()); + final List source = replicas.stream() + .filter(r -> + r.getState() == State.QUASI_CLOSED || + r.getState() == State.CLOSED) + .filter(r -> !deletionInFlight.contains(r.getDatanodeDetails())) + .sorted((r1, r2) -> r2.getSequenceId().compareTo(r1.getSequenceId())) + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList()); + if (source.size() > 0) { + final int replicationFactor = container + .getReplicationFactor().getNumber(); + final int delta = replicationFactor - getReplicaCount(id, replicas); + final List selectedDatanodes = containerPlacement + .chooseDatanodes(source, delta, container.getUsedBytes()); + + LOG.info("Container {} is under replicated. Expected replica count" + + " is {}, but found {}.", id, replicationFactor, + replicationFactor - delta); + + for (DatanodeDetails datanode : selectedDatanodes) { + sendReplicateCommand(container, datanode, source); + } + } else { + LOG.warn("Cannot replicate container {}, no healthy replica found.", + container.containerID()); + } + } catch (IOException ex) { + LOG.warn("Exception while replicating container {}.", + container.getContainerID(), ex); + } + } + + /** + * If the given container is over replicated, identify the datanode(s) + * to delete the container and send delete container command to the + * identified datanode(s). + * + * @param container ContainerInfo + * @param replicas Set of ContainerReplicas + */ + private void handleOverReplicatedContainer(final ContainerInfo container, + final Set replicas) { + + final ContainerID id = container.containerID(); + final int replicationFactor = container.getReplicationFactor().getNumber(); + // Dont consider inflight replication while calculating excess here. + final int excess = replicas.size() - replicationFactor - + inflightDeletion.getOrDefault(id, Collections.emptyList()).size(); + + if (excess > 0) { + + LOG.info("Container {} is over replicated. Expected replica count" + + " is {}, but found {}.", id, replicationFactor, + replicationFactor + excess); + + final Map uniqueReplicas = + new LinkedHashMap<>(); + + replicas.stream() + .filter(r -> compareState(container.getState(), r.getState())) + .forEach(r -> uniqueReplicas + .putIfAbsent(r.getOriginDatanodeId(), r)); + + // Retain one healthy replica per origin node Id. + final List eligibleReplicas = new ArrayList<>(replicas); + eligibleReplicas.removeAll(uniqueReplicas.values()); + + final List unhealthyReplicas = eligibleReplicas + .stream() + .filter(r -> !compareState(container.getState(), r.getState())) + .collect(Collectors.toList()); + + //Move the unhealthy replicas to the front of eligible replicas to delete + eligibleReplicas.removeAll(unhealthyReplicas); + eligibleReplicas.addAll(0, unhealthyReplicas); + + for (int i = 0; i < excess; i++) { + sendDeleteCommand(container, + eligibleReplicas.get(i).getDatanodeDetails(), true); + } + } + } + + /** + * Handles unstable container. + * A container is inconsistent if any of the replica state doesn't + * match the container state. We have to take appropriate action + * based on state of the replica. + * + * @param container ContainerInfo + * @param replicas Set of ContainerReplicas + */ + private void handleUnstableContainer(final ContainerInfo container, + final Set replicas) { + // Find unhealthy replicas + List unhealthyReplicas = replicas.stream() + .filter(r -> !compareState(container.getState(), r.getState())) + .collect(Collectors.toList()); + + Iterator iterator = unhealthyReplicas.iterator(); + while (iterator.hasNext()) { + final ContainerReplica replica = iterator.next(); + final State state = replica.getState(); + if (state == State.OPEN || state == State.CLOSING) { + sendCloseCommand(container, replica.getDatanodeDetails(), false); + iterator.remove(); + } + + if (state == State.QUASI_CLOSED) { + // Send force close command if the BCSID matches + if (container.getSequenceId() == replica.getSequenceId()) { + sendCloseCommand(container, replica.getDatanodeDetails(), true); + iterator.remove(); + } + } + } + + // Now we are left with the replicas which are either unhealthy or + // the BCSID doesn't match. These replicas should be deleted. + + /* + * If we have unhealthy replicas we go under replicated and then + * replicate the healthy copy. + * + * We also make sure that we delete only one unhealthy replica at a time. + * + * If there are two unhealthy replica: + * - Delete first unhealthy replica + * - Re-replicate the healthy copy + * - Delete second unhealthy replica + * - Re-replicate the healthy copy + * + * Note: Only one action will be executed in a single ReplicationMonitor + * iteration. So to complete all the above actions we need four + * ReplicationMonitor iterations. + */ + + unhealthyReplicas.stream().findFirst().ifPresent(replica -> + sendDeleteCommand(container, replica.getDatanodeDetails(), false)); + + } + + /** + * Sends close container command for the given container to the given + * datanode. + * + * @param container Container to be closed + * @param datanode The datanode on which the container + * has to be closed + * @param force Should be set to true if we want to close a + * QUASI_CLOSED container + */ + private void sendCloseCommand(final ContainerInfo container, + final DatanodeDetails datanode, + final boolean force) { + + LOG.info("Sending close container command for container {}" + + " to datanode {}.", container.containerID(), datanode); + + CloseContainerCommand closeContainerCommand = + new CloseContainerCommand(container.getContainerID(), + container.getPipelineID(), force); + eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, + new CommandForDatanode<>(datanode.getUuid(), closeContainerCommand)); + } + + /** + * Sends replicate container command for the given container to the given + * datanode. + * + * @param container Container to be replicated + * @param datanode The destination datanode to replicate + * @param sources List of source nodes from where we can replicate + */ + private void sendReplicateCommand(final ContainerInfo container, + final DatanodeDetails datanode, + final List sources) { + + LOG.info("Sending replicate container command for container {}" + + " to datanode {}", container.containerID(), datanode); + + final ContainerID id = container.containerID(); + final ReplicateContainerCommand replicateCommand = + new ReplicateContainerCommand(id.getId(), sources); + inflightReplication.computeIfAbsent(id, k -> new ArrayList<>()); + sendAndTrackDatanodeCommand(datanode, replicateCommand, + action -> inflightReplication.get(id).add(action)); + } + + /** + * Sends delete container command for the given container to the given + * datanode. + * + * @param container Container to be deleted + * @param datanode The datanode on which the replica should be deleted + * @param force Should be set to true to delete an OPEN replica + */ + private void sendDeleteCommand(final ContainerInfo container, + final DatanodeDetails datanode, + final boolean force) { + + LOG.info("Sending delete container command for container {}" + + " to datanode {}", container.containerID(), datanode); + + final ContainerID id = container.containerID(); + final DeleteContainerCommand deleteCommand = + new DeleteContainerCommand(id.getId(), force); + inflightDeletion.computeIfAbsent(id, k -> new ArrayList<>()); + sendAndTrackDatanodeCommand(datanode, deleteCommand, + action -> inflightDeletion.get(id).add(action)); + } + + /** + * Creates CommandForDatanode with the given SCMCommand and fires + * DATANODE_COMMAND event to event queue. + * + * Tracks the command using the given tracker. + * + * @param datanode Datanode to which the command has to be sent + * @param command SCMCommand to be sent + * @param tracker Tracker which tracks the inflight actions + * @param Type of SCMCommand + */ + private void sendAndTrackDatanodeCommand( + final DatanodeDetails datanode, + final SCMCommand command, + final Consumer tracker) { + final CommandForDatanode datanodeCommand = + new CommandForDatanode<>(datanode.getUuid(), command); + eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand); + tracker.accept(new InflightAction(datanode, Time.monotonicNow())); + } + + /** + * Compares the container state with the replica state. + * + * @param containerState ContainerState + * @param replicaState ReplicaState + * @return true if the state matches, false otherwise + */ + private static boolean compareState(final LifeCycleState containerState, + final State replicaState) { + switch (containerState) { + case OPEN: + return replicaState == State.OPEN; + case CLOSING: + return replicaState == State.CLOSING; + case QUASI_CLOSED: + return replicaState == State.QUASI_CLOSED; + case CLOSED: + return replicaState == State.CLOSED; + case DELETING: + return false; + case DELETED: + return false; + default: + return false; + } + } + + /** + * Wrapper class to hold the InflightAction with its start time. + */ + private static final class InflightAction { + + private final DatanodeDetails datanode; + private final long time; + + private InflightAction(final DatanodeDetails datanode, + final long time) { + this.datanode = datanode; + this.time = time; + } + } +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java index 2615289f434..6dd1949eaf6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java @@ -131,6 +131,16 @@ public class SCMContainerManager implements ContainerManager { return containerStateManager; } + @Override + public Set getContainerIDs() { + lock.lock(); + try { + return containerStateManager.getAllContainerIDs(); + } finally { + lock.unlock(); + } + } + @Override public List getContainers() { lock.lock(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java index 0c738b2f3b4..2aba72487c5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java @@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory; import java.util.HashSet; import java.util.Set; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.NavigableSet; import java.util.TreeSet; @@ -107,9 +106,9 @@ public class ContainerStateMap { this.ownerMap = new ContainerAttribute<>(); this.factorMap = new ContainerAttribute<>(); this.typeMap = new ContainerAttribute<>(); - this.containerMap = new HashMap<>(); + this.containerMap = new ConcurrentHashMap<>(); this.lock = new ReentrantReadWriteLock(); - this.replicaMap = new HashMap<>(); + this.replicaMap = new ConcurrentHashMap<>(); this.resultCache = new ConcurrentHashMap<>(); } @@ -208,7 +207,7 @@ public class ContainerStateMap { try { checkIfContainerExist(containerID); return Collections - .unmodifiableSet(new HashSet<>(replicaMap.get(containerID))); + .unmodifiableSet(replicaMap.get(containerID)); } finally { lock.readLock().unlock(); } @@ -342,7 +341,7 @@ public class ContainerStateMap { } public Set getAllContainerIDs() { - return containerMap.keySet(); + return Collections.unmodifiableSet(containerMap.keySet()); } /** diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index a5284e4e7a2..19c35fd462f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm; import com.google.common.base.Preconditions; +import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineAction; @@ -30,6 +31,8 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.server.SCMConfigurator; import org.apache.hadoop.hdds.scm.server @@ -66,7 +69,9 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; @@ -98,7 +103,7 @@ public final class TestUtils { * * @return DatanodeDetails */ - private static DatanodeDetails createDatanodeDetails(UUID uuid) { + public static DatanodeDetails createDatanodeDetails(UUID uuid) { String ipAddress = random.nextInt(256) + "." + random.nextInt(256) + "." + random.nextInt(256) @@ -521,4 +526,50 @@ public final class TestUtils { return new StorageContainerManager(conf, configurator); } + public static ContainerInfo getContainer( + final HddsProtos.LifeCycleState state) { + return new ContainerInfo.Builder() + .setContainerID(RandomUtils.nextLong()) + .setReplicationType(HddsProtos.ReplicationType.RATIS) + .setReplicationFactor(HddsProtos.ReplicationFactor.THREE) + .setState(state) + .setOwner("TEST") + .build(); + } + + public static Set getReplicas( + final ContainerID containerId, + final ContainerReplicaProto.State state, + final DatanodeDetails... datanodeDetails) { + return getReplicas(containerId, state, 10000L, datanodeDetails); + } + + public static Set getReplicas( + final ContainerID containerId, + final ContainerReplicaProto.State state, + final long sequenceId, + final DatanodeDetails... datanodeDetails) { + Set replicas = new HashSet<>(); + for (DatanodeDetails datanode : datanodeDetails) { + replicas.add(getReplicas(containerId, state, + sequenceId, datanode.getUuid(), datanode)); + } + return replicas; + } + + public static ContainerReplica getReplicas( + final ContainerID containerId, + final ContainerReplicaProto.State state, + final long sequenceId, + final UUID originNodeId, + final DatanodeDetails datanodeDetails) { + return ContainerReplica.newBuilder() + .setContainerID(containerId) + .setContainerState(state) + .setDatanodeDetails(datanodeDetails) + .setOriginNodeId(originNodeId) + .setSequenceId(sequenceId) + .build(); + } + } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index 864a1a9936e..0b7cae48ad9 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -47,10 +47,10 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.hadoop.hdds.scm.container - .TestContainerReportHelper.getReplicas; -import static org.apache.hadoop.hdds.scm.container - .TestContainerReportHelper.getContainer; +import static org.apache.hadoop.hdds.scm.TestUtils + .getReplicas; +import static org.apache.hadoop.hdds.scm.TestUtils + .getContainer; import static org.apache.hadoop.hdds.scm.container .TestContainerReportHelper.addContainerToContainerManager; import static org.apache.hadoop.hdds.scm.container diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java index 0fb50a426ab..860ec4d7fda 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java @@ -16,19 +16,12 @@ */ package org.apache.hadoop.hdds.scm.container; -import org.apache.commons.lang3.RandomUtils; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.mockito.Mockito; import org.mockito.stubbing.Answer; import java.io.IOException; -import java.util.HashSet; import java.util.Set; /** @@ -77,37 +70,4 @@ public final class TestContainerReportHelper { containerInfo.containerID(), event); } - public static ContainerInfo getContainer(final LifeCycleState state) { - return new ContainerInfo.Builder() - .setContainerID(RandomUtils.nextLong()) - .setReplicationType(ReplicationType.RATIS) - .setReplicationFactor(ReplicationFactor.THREE) - .setState(state) - .build(); - } - - static Set getReplicas( - final ContainerID containerId, - final ContainerReplicaProto.State state, - final DatanodeDetails... datanodeDetails) { - return getReplicas(containerId, state, 10000L, datanodeDetails); - } - - static Set getReplicas( - final ContainerID containerId, - final ContainerReplicaProto.State state, - final long sequenceId, - final DatanodeDetails... datanodeDetails) { - Set replicas = new HashSet<>(); - for (DatanodeDetails datanode : datanodeDetails) { - replicas.add(ContainerReplica.newBuilder() - .setContainerID(containerId) - .setContainerState(state) - .setDatanodeDetails(datanode) - .setOriginNodeId(datanode.getUuid()) - .setSequenceId(sequenceId) - .build()); - } - return replicas; - } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java index 23e96dd542c..6c9383f0360 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java @@ -38,10 +38,8 @@ import java.util.Set; import static org.apache.hadoop.hdds.scm.container .TestContainerReportHelper.addContainerToContainerManager; -import static org.apache.hadoop.hdds.scm.container - .TestContainerReportHelper.getContainer; -import static org.apache.hadoop.hdds.scm.container - .TestContainerReportHelper.getReplicas; +import static org.apache.hadoop.hdds.scm.TestUtils.getContainer; +import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas; import static org.apache.hadoop.hdds.scm.container .TestContainerReportHelper.mockUpdateContainerReplica; import static org.apache.hadoop.hdds.scm.container diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java new file mode 100644 index 00000000000..83b9aa33e1b --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java @@ -0,0 +1,625 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.hadoop.hdds.scm.TestUtils.createDatanodeDetails; +import static org.apache.hadoop.hdds.scm.TestUtils.getContainer; +import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas; +import static org.apache.hadoop.hdds.scm.TestUtils.randomDatanodeDetails; + +/** + * Test cases to verify the functionality of ReplicationManager. + */ +public class TestReplicationManager { + + private ReplicationManager replicationManager; + private ContainerStateManager containerStateManager; + private ContainerPlacementPolicy containerPlacementPolicy; + private EventQueue eventQueue; + private DatanodeCommandHandler datanodeCommandHandler; + + @Before + public void setup() throws IOException, InterruptedException { + final Configuration conf = new OzoneConfiguration(); + final ContainerManager containerManager = + Mockito.mock(ContainerManager.class); + eventQueue = new EventQueue(); + containerStateManager = new ContainerStateManager(conf); + + datanodeCommandHandler = new DatanodeCommandHandler(); + eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, datanodeCommandHandler); + + Mockito.when(containerManager.getContainerIDs()) + .thenAnswer(invocation -> containerStateManager.getAllContainerIDs()); + + Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class))) + .thenAnswer(invocation -> containerStateManager + .getContainer((ContainerID)invocation.getArguments()[0])); + + Mockito.when(containerManager.getContainerReplicas( + Mockito.any(ContainerID.class))) + .thenAnswer(invocation -> containerStateManager + .getContainerReplicas((ContainerID)invocation.getArguments()[0])); + + containerPlacementPolicy = Mockito.mock(ContainerPlacementPolicy.class); + + Mockito.when(containerPlacementPolicy.chooseDatanodes( + Mockito.anyListOf(DatanodeDetails.class), + Mockito.anyInt(), Mockito.anyLong())) + .thenAnswer(invocation -> { + int count = (int) invocation.getArguments()[1]; + return IntStream.range(0, count) + .mapToObj(i -> randomDatanodeDetails()) + .collect(Collectors.toList()); + }); + + replicationManager = new ReplicationManager( + conf, containerManager, containerPlacementPolicy, eventQueue); + replicationManager.start(); + Thread.sleep(100L); + } + + /** + * Open containers are not handled by ReplicationManager. + * This test-case makes sure that ReplicationManages doesn't take + * any action on OPEN containers. + */ + @Test + public void testOpenContainer() throws SCMException, InterruptedException { + final ContainerInfo container = getContainer(LifeCycleState.OPEN); + containerStateManager.loadContainer(container); + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(0, datanodeCommandHandler.getInvocation()); + + } + + /** + * If the container is in CLOSING state we resend close container command + * to all the datanodes. + */ + @Test + public void testClosingContainer() throws + SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = getContainer(LifeCycleState.CLOSING); + final ContainerID id = container.containerID(); + + containerStateManager.loadContainer(container); + + // Two replicas in CLOSING state + final Set replicas = getReplicas(id, State.CLOSING, + randomDatanodeDetails(), + randomDatanodeDetails()); + + // One replica in OPEN state + final DatanodeDetails datanode = randomDatanodeDetails(); + replicas.addAll(getReplicas(id, State.OPEN, datanode)); + + for (ContainerReplica replica : replicas) { + containerStateManager.updateContainerReplica(id, replica); + } + + final int currentCloseCommandCount = datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.closeContainerCommand); + + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.closeContainerCommand)); + + // Update the OPEN to CLOSING + for (ContainerReplica replica : getReplicas(id, State.CLOSING, datanode)) { + containerStateManager.updateContainerReplica(id, replica); + } + + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(currentCloseCommandCount + 6, datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.closeContainerCommand)); + } + + + /** + * The container is QUASI_CLOSED but two of the replica is still in + * open state. ReplicationManager should resend close command to those + * datanodes. + */ + @Test + public void testQuasiClosedContainerWithTwoOpenReplica() throws + SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED); + final ContainerID id = container.containerID(); + final UUID originNodeId = UUID.randomUUID(); + final ContainerReplica replicaOne = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaTwo = getReplicas( + id, State.OPEN, 1000L, originNodeId, randomDatanodeDetails()); + final DatanodeDetails datanodeDetails = randomDatanodeDetails(); + final ContainerReplica replicaThree = getReplicas( + id, State.OPEN, 1000L, datanodeDetails.getUuid(), datanodeDetails); + + containerStateManager.loadContainer(container); + containerStateManager.updateContainerReplica(id, replicaOne); + containerStateManager.updateContainerReplica(id, replicaTwo); + containerStateManager.updateContainerReplica(id, replicaThree); + + final int currentCloseCommandCount = datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.closeContainerCommand); + // Two of the replicas are in OPEN state + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(currentCloseCommandCount + 2, datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.closeContainerCommand)); + Assert.assertTrue(datanodeCommandHandler.received( + SCMCommandProto.Type.closeContainerCommand, + replicaTwo.getDatanodeDetails())); + Assert.assertTrue(datanodeCommandHandler.received( + SCMCommandProto.Type.closeContainerCommand, + replicaThree.getDatanodeDetails())); + } + + /** + * When the container is in QUASI_CLOSED state and all the replicas are + * also in QUASI_CLOSED state and doesn't have a quorum to force close + * the container, ReplicationManager will not do anything. + */ + @Test + public void testHealthyQuasiClosedContainer() throws + SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED); + final ContainerID id = container.containerID(); + final UUID originNodeId = UUID.randomUUID(); + final ContainerReplica replicaOne = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaTwo = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaThree = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + + containerStateManager.loadContainer(container); + containerStateManager.updateContainerReplica(id, replicaOne); + containerStateManager.updateContainerReplica(id, replicaTwo); + containerStateManager.updateContainerReplica(id, replicaThree); + + // All the QUASI_CLOSED replicas have same originNodeId, so the + // container will not be closed. ReplicationManager should take no action. + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(0, datanodeCommandHandler.getInvocation()); + } + + /** + * When a container is QUASI_CLOSED and we don't have quorum to force close + * the container, the container should have all the replicas in QUASI_CLOSED + * state, else ReplicationManager will take action. + * + * In this test case we make one of the replica unhealthy, replication manager + * will send delete container command to the datanode which has the unhealthy + * replica. + */ + @Test + public void testQuasiClosedContainerWithUnhealthyReplica() + throws SCMException, ContainerNotFoundException, InterruptedException, + ContainerReplicaNotFoundException { + final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED); + final ContainerID id = container.containerID(); + final UUID originNodeId = UUID.randomUUID(); + final ContainerReplica replicaOne = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaTwo = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaThree = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + + containerStateManager.loadContainer(container); + containerStateManager.updateContainerReplica(id, replicaOne); + containerStateManager.updateContainerReplica(id, replicaTwo); + containerStateManager.updateContainerReplica(id, replicaThree); + + final int currentDeleteCommandCount = datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand); + final int currentReplicateCommandCount = datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand); + + // All the QUASI_CLOSED replicas have same originNodeId, so the + // container will not be closed. ReplicationManager should take no action. + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(0, datanodeCommandHandler.getInvocation()); + + // Make the first replica unhealthy + final ContainerReplica unhealthyReplica = getReplicas( + id, State.UNHEALTHY, 1000L, originNodeId, + replicaOne.getDatanodeDetails()); + containerStateManager.updateContainerReplica(id, unhealthyReplica); + + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand)); + Assert.assertTrue(datanodeCommandHandler.received( + SCMCommandProto.Type.deleteContainerCommand, + replicaOne.getDatanodeDetails())); + + // Now we will delete the unhealthy replica from in-memory. + containerStateManager.removeContainerReplica(id, replicaOne); + + // The container is under replicated as unhealthy replica is removed + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + + // We should get replicate command + Assert.assertEquals(currentReplicateCommandCount + 1, + datanodeCommandHandler.getInvocationCount( + SCMCommandProto.Type.replicateContainerCommand)); + } + + /** + * When a QUASI_CLOSED container is over replicated, ReplicationManager + * deletes the excess replicas. + */ + @Test + public void testOverReplicatedQuasiClosedContainer() throws + SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED); + final ContainerID id = container.containerID(); + final UUID originNodeId = UUID.randomUUID(); + final ContainerReplica replicaOne = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaTwo = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaThree = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaFour = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + + containerStateManager.loadContainer(container); + containerStateManager.updateContainerReplica(id, replicaOne); + containerStateManager.updateContainerReplica(id, replicaTwo); + containerStateManager.updateContainerReplica(id, replicaThree); + containerStateManager.updateContainerReplica(id, replicaFour); + + final int currentDeleteCommandCount = datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand); + + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand)); + } + + /** + * When a QUASI_CLOSED container is over replicated, ReplicationManager + * deletes the excess replicas. While choosing the replica for deletion + * ReplicationManager should prioritize unhealthy replica over QUASI_CLOSED + * replica. + */ + @Test + public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica() + throws SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED); + final ContainerID id = container.containerID(); + final UUID originNodeId = UUID.randomUUID(); + final ContainerReplica replicaOne = getReplicas( + id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaTwo = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaThree = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaFour = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + + containerStateManager.loadContainer(container); + containerStateManager.updateContainerReplica(id, replicaOne); + containerStateManager.updateContainerReplica(id, replicaTwo); + containerStateManager.updateContainerReplica(id, replicaThree); + containerStateManager.updateContainerReplica(id, replicaFour); + + final int currentDeleteCommandCount = datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand); + + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand)); + Assert.assertTrue(datanodeCommandHandler.received( + SCMCommandProto.Type.deleteContainerCommand, + replicaOne.getDatanodeDetails())); + } + + /** + * ReplicationManager should replicate an QUASI_CLOSED replica if it is + * under replicated. + */ + @Test + public void testUnderReplicatedQuasiClosedContainer() throws + SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED); + final ContainerID id = container.containerID(); + final UUID originNodeId = UUID.randomUUID(); + final ContainerReplica replicaOne = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaTwo = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + + containerStateManager.loadContainer(container); + containerStateManager.updateContainerReplica(id, replicaOne); + containerStateManager.updateContainerReplica(id, replicaTwo); + + final int currentReplicateCommandCount = datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand); + + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(currentReplicateCommandCount + 1, + datanodeCommandHandler.getInvocationCount( + SCMCommandProto.Type.replicateContainerCommand)); + } + + /** + * When a QUASI_CLOSED container is under replicated, ReplicationManager + * should re-replicate it. If there are any unhealthy replica, it has to + * be deleted. + * + * In this test case, the container is QUASI_CLOSED and is under replicated + * and also has an unhealthy replica. + * + * In the first iteration of ReplicationManager, it should re-replicate + * the container so that it has enough replicas. + * + * In the second iteration, ReplicationManager should delete the unhealthy + * replica. + * + * In the third iteration, ReplicationManager will re-replicate as the + * container has again become under replicated after the unhealthy + * replica has been deleted. + * + */ + @Test + public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica() + throws SCMException, ContainerNotFoundException, InterruptedException, + ContainerReplicaNotFoundException { + final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED); + final ContainerID id = container.containerID(); + final UUID originNodeId = UUID.randomUUID(); + final ContainerReplica replicaOne = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails()); + final ContainerReplica replicaTwo = getReplicas( + id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails()); + + containerStateManager.loadContainer(container); + containerStateManager.updateContainerReplica(id, replicaOne); + containerStateManager.updateContainerReplica(id, replicaTwo); + + final int currentReplicateCommandCount = datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand); + final int currentDeleteCommandCount = datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand); + + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(currentReplicateCommandCount + 1, + datanodeCommandHandler.getInvocationCount( + SCMCommandProto.Type.replicateContainerCommand)); + + Optional replicateCommand = datanodeCommandHandler + .getReceivedCommands().stream() + .filter(c -> c.getCommand().getType() + .equals(SCMCommandProto.Type.replicateContainerCommand)) + .findFirst(); + + Assert.assertTrue(replicateCommand.isPresent()); + + DatanodeDetails newNode = createDatanodeDetails( + replicateCommand.get().getDatanodeId()); + ContainerReplica newReplica = getReplicas( + id, State.QUASI_CLOSED, 1000L, originNodeId, newNode); + containerStateManager.updateContainerReplica(id, newReplica); + + /* + * We have report the replica to SCM, in the next ReplicationManager + * iteration it should delete the unhealthy replica. + */ + + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand)); + // ReplicaTwo should be deleted, that is the unhealthy one + Assert.assertTrue(datanodeCommandHandler.received( + SCMCommandProto.Type.deleteContainerCommand, + replicaTwo.getDatanodeDetails())); + + containerStateManager.removeContainerReplica(id, replicaTwo); + + /* + * We have now removed unhealthy replica, next iteration of + * ReplicationManager should re-replicate the container as it + * is under replicated now + */ + + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(currentReplicateCommandCount + 2, + datanodeCommandHandler.getInvocationCount( + SCMCommandProto.Type.replicateContainerCommand)); + } + + + /** + * When a container is QUASI_CLOSED and it has >50% of its replica + * in QUASI_CLOSED state with unique origin node id, + * ReplicationManager should force close the replica(s) with + * highest BCSID. + */ + @Test + public void testQuasiClosedToClosed() throws + SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED); + final ContainerID id = container.containerID(); + final Set replicas = getReplicas(id, State.QUASI_CLOSED, + randomDatanodeDetails(), + randomDatanodeDetails(), + randomDatanodeDetails()); + containerStateManager.loadContainer(container); + for (ContainerReplica replica : replicas) { + containerStateManager.updateContainerReplica(id, replica); + } + + final int currentCloseCommandCount = datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.closeContainerCommand); + + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + + // All the replicas have same BCSID, so all of them will be closed. + Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.closeContainerCommand)); + + } + + + /** + * ReplicationManager should not take any action if the container is + * CLOSED and healthy. + */ + @Test + public void testHealthyClosedContainer() + throws SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = getContainer(LifeCycleState.CLOSED); + final ContainerID id = container.containerID(); + final Set replicas = getReplicas(id, State.CLOSED, + randomDatanodeDetails(), + randomDatanodeDetails(), + randomDatanodeDetails()); + + containerStateManager.loadContainer(container); + for (ContainerReplica replica : replicas) { + containerStateManager.updateContainerReplica(id, replica); + } + + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(0, datanodeCommandHandler.getInvocation()); + } + + @After + public void teardown() throws IOException { + containerStateManager.close(); + replicationManager.stop(); + } + + private class DatanodeCommandHandler implements + EventHandler { + + private AtomicInteger invocation = new AtomicInteger(0); + private Map commandInvocation = + new HashMap<>(); + private List commands = new ArrayList<>(); + + @Override + public void onMessage(final CommandForDatanode command, + final EventPublisher publisher) { + final SCMCommandProto.Type type = command.getCommand().getType(); + commandInvocation.computeIfAbsent(type, k -> new AtomicInteger(0)); + commandInvocation.get(type).incrementAndGet(); + invocation.incrementAndGet(); + commands.add(command); + } + + private int getInvocation() { + return invocation.get(); + } + + private int getInvocationCount(SCMCommandProto.Type type) { + return commandInvocation.containsKey(type) ? + commandInvocation.get(type).get() : 0; + } + + private List getReceivedCommands() { + return commands; + } + + /** + * Returns true if the command handler has received the given + * command type for the provided datanode. + * + * @param type Command Type + * @param datanode DatanodeDetails + * @return True if command was received, false otherwise + */ + private boolean received(final SCMCommandProto.Type type, + final DatanodeDetails datanode) { + return commands.stream().anyMatch(dc -> + dc.getCommand().getType().equals(type) && + dc.getDatanodeId().equals(datanode.getUuid())); + } + } + +} \ No newline at end of file