HDDS-1205. Refactor ReplicationManager to handle QUASI_CLOSED containers. Contributed by Nanda kumar. (#620)

This commit is contained in:
Arpit Agarwal 2019-03-22 14:36:29 -07:00 committed by GitHub
parent 509b20b292
commit f854a89190
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1485 additions and 54 deletions

View File

@ -348,6 +348,18 @@ public final class ScmConfigKeys {
public static final String HDDS_SCM_WATCHER_TIMEOUT_DEFAULT = public static final String HDDS_SCM_WATCHER_TIMEOUT_DEFAULT =
"10m"; "10m";
public static final String HDDS_SCM_REPLICATION_THREAD_INTERVAL =
"hdds.scm.replication.thread.interval";
public static final String HDDS_SCM_REPLICATION_THREAD_INTERVAL_DEFAULT =
"5m";
public static final String HDDS_SCM_REPLICATION_EVENT_TIMEOUT =
"hdds.scm.replication.event.timeout";
public static final String HDDS_SCM_REPLICATION_EVENT_TIMEOUT_DEFAULT =
"10m";
public static final String public static final String
HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY = HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY =
"hdds.scm.http.kerberos.principal"; "hdds.scm.http.kerberos.principal";

View File

@ -2357,4 +2357,24 @@
Request to flush the OM DB before taking checkpoint snapshot. Request to flush the OM DB before taking checkpoint snapshot.
</description> </description>
</property> </property>
<property>
<name>hdds.scm.replication.thread.interval</name>
<value>5m</value>
<tag>OZONE, SCM</tag>
<description>
There is a replication monitor thread running inside SCM which
takes care of replicating the containers in the cluster. This
property is used to configure the interval in which that thread
runs.
</description>
</property>
<property>
<name>hdds.scm.replication.event.timeout</name>
<value>10m</value>
<tag>OZONE, SCM</tag>
<description>
Timeout for the container replication/deletion commands sent
to datanodes. After this timeout the command will be retried.
</description>
</property>
</configuration> </configuration>

View File

@ -34,6 +34,14 @@
*/ */
public interface ContainerManager extends Closeable { public interface ContainerManager extends Closeable {
/**
* Returns all the container Ids managed by ContainerManager.
*
* @return Set of ContainerID
*/
Set<ContainerID> getContainerIDs();
/** /**
* Returns all the containers managed by ContainerManager. * Returns all the containers managed by ContainerManager.
* *

View File

@ -0,0 +1,748 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.GeneratedMessage;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.lock.LockManager;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Time;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* Replication Manager (RM) is the one which is responsible for making sure
* that the containers are properly replicated. Replication Manager deals only
* with Quasi Closed / Closed container.
*/
public class ReplicationManager {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationManager.class);
/**
* Reference to the ContainerManager.
*/
private final ContainerManager containerManager;
/**
* PlacementPolicy which is used to identify where a container
* should be replicated.
*/
private final ContainerPlacementPolicy containerPlacement;
/**
* EventPublisher to fire Replicate and Delete container events.
*/
private final EventPublisher eventPublisher;
/**
* Used for locking a container using its ID while processing it.
*/
private final LockManager<ContainerID> lockManager;
/**
* This is used for tracking container replication commands which are issued
* by ReplicationManager and not yet complete.
*/
private final Map<ContainerID, List<InflightAction>> inflightReplication;
/**
* This is used for tracking container deletion commands which are issued
* by ReplicationManager and not yet complete.
*/
private final Map<ContainerID, List<InflightAction>> inflightDeletion;
/**
* ReplicationMonitor thread is the one which wakes up at configured
* interval and processes all the containers.
*/
private final Thread replicationMonitor;
/**
* The frequency in which ReplicationMonitor thread should run.
*/
private final long interval;
/**
* Timeout for container replication & deletion command issued by
* ReplicationManager.
*/
private final long eventTimeout;
/**
* Flag used for checking if the ReplicationMonitor thread is running or
* not.
*/
private volatile boolean running;
/**
* Constructs ReplicationManager instance with the given configuration.
*
* @param conf OzoneConfiguration
* @param containerManager ContainerManager
* @param containerPlacement ContainerPlacementPolicy
* @param eventPublisher EventPublisher
*/
public ReplicationManager(final Configuration conf,
final ContainerManager containerManager,
final ContainerPlacementPolicy containerPlacement,
final EventPublisher eventPublisher) {
this.containerManager = containerManager;
this.containerPlacement = containerPlacement;
this.eventPublisher = eventPublisher;
this.lockManager = new LockManager<>(conf);
this.inflightReplication = new HashMap<>();
this.inflightDeletion = new HashMap<>();
this.replicationMonitor = new Thread(this::run);
this.replicationMonitor.setName("ReplicationMonitor");
this.replicationMonitor.setDaemon(true);
this.interval = conf.getTimeDuration(
ScmConfigKeys.HDDS_SCM_REPLICATION_THREAD_INTERVAL,
ScmConfigKeys.HDDS_SCM_REPLICATION_THREAD_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
this.eventTimeout = conf.getTimeDuration(
ScmConfigKeys.HDDS_SCM_REPLICATION_EVENT_TIMEOUT,
ScmConfigKeys.HDDS_SCM_REPLICATION_EVENT_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
this.running = false;
}
/**
* Starts Replication Monitor thread.
*/
public synchronized void start() {
if (!running) {
LOG.info("Starting Replication Monitor Thread.");
running = true;
replicationMonitor.start();
} else {
LOG.info("Replication Monitor Thread is already running.");
}
}
/**
* Process all the containers immediately.
*/
@VisibleForTesting
@SuppressFBWarnings(value="NN_NAKED_NOTIFY",
justification="Used only for testing")
synchronized void processContainersNow() {
notify();
}
/**
* Stops Replication Monitor thread.
*/
public synchronized void stop() {
if (running) {
LOG.info("Stopping Replication Monitor Thread.");
running = false;
notify();
} else {
LOG.info("Replication Monitor Thread is not running.");
}
}
/**
* ReplicationMonitor thread runnable. This wakes up at configured
* interval and processes all the containers in the system.
*/
private synchronized void run() {
try {
while (running) {
final long start = Time.monotonicNow();
final Set<ContainerID> containerIds =
containerManager.getContainerIDs();
containerIds.forEach(this::processContainer);
LOG.info("Replication Monitor Thread took {} milliseconds for" +
" processing {} containers.", Time.monotonicNow() - start,
containerIds.size());
wait(interval);
}
} catch (Throwable t) {
// When we get runtime exception, we should terminate SCM.
LOG.error("Exception in Replication Monitor Thread.", t);
ExitUtil.terminate(1, t);
}
}
/**
* Process the given container.
*
* @param id ContainerID
*/
private void processContainer(ContainerID id) {
lockManager.lock(id);
try {
final ContainerInfo container = containerManager.getContainer(id);
final Set<ContainerReplica> replicas = containerManager
.getContainerReplicas(container.containerID());
final LifeCycleState state = container.getState();
/*
* We don't take any action if the container is in OPEN state.
*/
if (state == LifeCycleState.OPEN) {
return;
}
/*
* If the container is in CLOSING state, the replicas can either
* be in OPEN or in CLOSING state. In both of this cases
* we have to resend close container command to the datanodes.
*/
if (state == LifeCycleState.CLOSING) {
replicas.forEach(replica -> sendCloseCommand(
container, replica.getDatanodeDetails(), false));
return;
}
/*
* If the container is in QUASI_CLOSED state, check and close the
* container if possible.
*/
if (state == LifeCycleState.QUASI_CLOSED &&
canForceCloseContainer(container, replicas)) {
forceCloseContainer(container, replicas);
return;
}
/*
* Before processing the container we have to reconcile the
* inflightReplication and inflightDeletion actions.
*
* We remove the entry from inflightReplication and inflightDeletion
* list, if the operation is completed or if it has timed out.
*/
updateInflightAction(container, inflightReplication,
action -> replicas.stream()
.anyMatch(r -> r.getDatanodeDetails().equals(action.datanode)));
updateInflightAction(container, inflightDeletion,
action -> replicas.stream()
.noneMatch(r -> r.getDatanodeDetails().equals(action.datanode)));
/*
* We don't have to take any action if the container is healthy.
*
* According to ReplicationMonitor container is considered healthy if
* the container is either in QUASI_CLOSED or in CLOSED state and has
* exact number of replicas in the same state.
*/
if (isContainerHealthy(container, replicas)) {
return;
}
/*
* Check if the container is under replicated and take appropriate
* action.
*/
if (isContainerUnderReplicated(container, replicas)) {
handleUnderReplicatedContainer(container, replicas);
return;
}
/*
* Check if the container is over replicated and take appropriate
* action.
*/
if (isContainerOverReplicated(container, replicas)) {
handleOverReplicatedContainer(container, replicas);
return;
}
/*
* The container is neither under nor over replicated and the container
* is not healthy. This means that the container has unhealthy/corrupted
* replica.
*/
handleUnstableContainer(container, replicas);
} catch (ContainerNotFoundException ex) {
LOG.warn("Missing container {}.", id);
} finally {
lockManager.unlock(id);
}
}
/**
* Reconciles the InflightActions for a given container.
*
* @param container Container to update
* @param inflightActions inflightReplication (or) inflightDeletion
* @param filter filter to check if the operation is completed
*/
private void updateInflightAction(final ContainerInfo container,
final Map<ContainerID, List<InflightAction>> inflightActions,
final Predicate<InflightAction> filter) {
final ContainerID id = container.containerID();
final long deadline = Time.monotonicNow() - eventTimeout;
if (inflightActions.containsKey(id)) {
final List<InflightAction> actions = inflightActions.get(id);
actions.removeIf(action -> action.time < deadline);
actions.removeIf(filter);
if (actions.isEmpty()) {
inflightActions.remove(id);
}
}
}
/**
* Returns true if the container is healthy according to ReplicationMonitor.
*
* According to ReplicationMonitor container is considered healthy if
* it has exact number of replicas in the same state as the container.
*
* @param container Container to check
* @param replicas Set of ContainerReplicas
* @return true if the container is healthy, false otherwise
*/
private boolean isContainerHealthy(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
return container.getReplicationFactor().getNumber() == replicas.size() &&
replicas.stream().allMatch(
r -> compareState(container.getState(), r.getState()));
}
/**
* Checks if the container is under replicated or not.
*
* @param container Container to check
* @param replicas Set of ContainerReplicas
* @return true if the container is under replicated, false otherwise
*/
private boolean isContainerUnderReplicated(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
return container.getReplicationFactor().getNumber() >
getReplicaCount(container.containerID(), replicas);
}
/**
* Checks if the container is over replicated or not.
*
* @param container Container to check
* @param replicas Set of ContainerReplicas
* @return true if the container if over replicated, false otherwise
*/
private boolean isContainerOverReplicated(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
return container.getReplicationFactor().getNumber() <
getReplicaCount(container.containerID(), replicas);
}
/**
* Returns the replication count of the given container. This also
* considers inflight replication and deletion.
*
* @param id ContainerID
* @param replicas Set of existing replicas
* @return number of estimated replicas for this container
*/
private int getReplicaCount(final ContainerID id,
final Set<ContainerReplica> replicas) {
return replicas.size()
+ inflightReplication.getOrDefault(id, Collections.emptyList()).size()
- inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
}
/**
* Returns true if more than 50% of the container replicas with unique
* originNodeId are in QUASI_CLOSED state.
*
* @param container Container to check
* @param replicas Set of ContainerReplicas
* @return true if we can force close the container, false otherwise
*/
private boolean canForceCloseContainer(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
Preconditions.assertTrue(container.getState() ==
LifeCycleState.QUASI_CLOSED);
final int replicationFactor = container.getReplicationFactor().getNumber();
final long uniqueQuasiClosedReplicaCount = replicas.stream()
.filter(r -> r.getState() == State.QUASI_CLOSED)
.map(ContainerReplica::getOriginDatanodeId)
.distinct()
.count();
return uniqueQuasiClosedReplicaCount > (replicationFactor / 2);
}
/**
* Force close the container replica(s) with highest sequence Id.
*
* <p>
* Note: We should force close the container only if >50% (quorum)
* of replicas with unique originNodeId are in QUASI_CLOSED state.
* </p>
*
* @param container ContainerInfo
* @param replicas Set of ContainerReplicas
*/
private void forceCloseContainer(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
Preconditions.assertTrue(container.getState() ==
LifeCycleState.QUASI_CLOSED);
final List<ContainerReplica> quasiClosedReplicas = replicas.stream()
.filter(r -> r.getState() == State.QUASI_CLOSED)
.collect(Collectors.toList());
final Long sequenceId = quasiClosedReplicas.stream()
.map(ContainerReplica::getSequenceId)
.max(Long::compare)
.orElse(-1L);
LOG.info("Force closing container {} with BCSID {}," +
" which is in QUASI_CLOSED state.",
container.containerID(), sequenceId);
quasiClosedReplicas.stream()
.filter(r -> sequenceId != -1L)
.filter(replica -> replica.getSequenceId().equals(sequenceId))
.forEach(replica -> sendCloseCommand(
container, replica.getDatanodeDetails(), true));
}
/**
* If the given container is under replicated, identify a new set of
* datanode(s) to replicate the container using ContainerPlacementPolicy
* and send replicate container command to the identified datanode(s).
*
* @param container ContainerInfo
* @param replicas Set of ContainerReplicas
*/
private void handleUnderReplicatedContainer(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
try {
final ContainerID id = container.containerID();
final List<DatanodeDetails> deletionInFlight = inflightDeletion
.getOrDefault(id, Collections.emptyList())
.stream()
.map(action -> action.datanode)
.collect(Collectors.toList());
final List<DatanodeDetails> source = replicas.stream()
.filter(r ->
r.getState() == State.QUASI_CLOSED ||
r.getState() == State.CLOSED)
.filter(r -> !deletionInFlight.contains(r.getDatanodeDetails()))
.sorted((r1, r2) -> r2.getSequenceId().compareTo(r1.getSequenceId()))
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
if (source.size() > 0) {
final int replicationFactor = container
.getReplicationFactor().getNumber();
final int delta = replicationFactor - getReplicaCount(id, replicas);
final List<DatanodeDetails> selectedDatanodes = containerPlacement
.chooseDatanodes(source, delta, container.getUsedBytes());
LOG.info("Container {} is under replicated. Expected replica count" +
" is {}, but found {}.", id, replicationFactor,
replicationFactor - delta);
for (DatanodeDetails datanode : selectedDatanodes) {
sendReplicateCommand(container, datanode, source);
}
} else {
LOG.warn("Cannot replicate container {}, no healthy replica found.",
container.containerID());
}
} catch (IOException ex) {
LOG.warn("Exception while replicating container {}.",
container.getContainerID(), ex);
}
}
/**
* If the given container is over replicated, identify the datanode(s)
* to delete the container and send delete container command to the
* identified datanode(s).
*
* @param container ContainerInfo
* @param replicas Set of ContainerReplicas
*/
private void handleOverReplicatedContainer(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
final ContainerID id = container.containerID();
final int replicationFactor = container.getReplicationFactor().getNumber();
// Dont consider inflight replication while calculating excess here.
final int excess = replicas.size() - replicationFactor -
inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
if (excess > 0) {
LOG.info("Container {} is over replicated. Expected replica count" +
" is {}, but found {}.", id, replicationFactor,
replicationFactor + excess);
final Map<UUID, ContainerReplica> uniqueReplicas =
new LinkedHashMap<>();
replicas.stream()
.filter(r -> compareState(container.getState(), r.getState()))
.forEach(r -> uniqueReplicas
.putIfAbsent(r.getOriginDatanodeId(), r));
// Retain one healthy replica per origin node Id.
final List<ContainerReplica> eligibleReplicas = new ArrayList<>(replicas);
eligibleReplicas.removeAll(uniqueReplicas.values());
final List<ContainerReplica> unhealthyReplicas = eligibleReplicas
.stream()
.filter(r -> !compareState(container.getState(), r.getState()))
.collect(Collectors.toList());
//Move the unhealthy replicas to the front of eligible replicas to delete
eligibleReplicas.removeAll(unhealthyReplicas);
eligibleReplicas.addAll(0, unhealthyReplicas);
for (int i = 0; i < excess; i++) {
sendDeleteCommand(container,
eligibleReplicas.get(i).getDatanodeDetails(), true);
}
}
}
/**
* Handles unstable container.
* A container is inconsistent if any of the replica state doesn't
* match the container state. We have to take appropriate action
* based on state of the replica.
*
* @param container ContainerInfo
* @param replicas Set of ContainerReplicas
*/
private void handleUnstableContainer(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
// Find unhealthy replicas
List<ContainerReplica> unhealthyReplicas = replicas.stream()
.filter(r -> !compareState(container.getState(), r.getState()))
.collect(Collectors.toList());
Iterator<ContainerReplica> iterator = unhealthyReplicas.iterator();
while (iterator.hasNext()) {
final ContainerReplica replica = iterator.next();
final State state = replica.getState();
if (state == State.OPEN || state == State.CLOSING) {
sendCloseCommand(container, replica.getDatanodeDetails(), false);
iterator.remove();
}
if (state == State.QUASI_CLOSED) {
// Send force close command if the BCSID matches
if (container.getSequenceId() == replica.getSequenceId()) {
sendCloseCommand(container, replica.getDatanodeDetails(), true);
iterator.remove();
}
}
}
// Now we are left with the replicas which are either unhealthy or
// the BCSID doesn't match. These replicas should be deleted.
/*
* If we have unhealthy replicas we go under replicated and then
* replicate the healthy copy.
*
* We also make sure that we delete only one unhealthy replica at a time.
*
* If there are two unhealthy replica:
* - Delete first unhealthy replica
* - Re-replicate the healthy copy
* - Delete second unhealthy replica
* - Re-replicate the healthy copy
*
* Note: Only one action will be executed in a single ReplicationMonitor
* iteration. So to complete all the above actions we need four
* ReplicationMonitor iterations.
*/
unhealthyReplicas.stream().findFirst().ifPresent(replica ->
sendDeleteCommand(container, replica.getDatanodeDetails(), false));
}
/**
* Sends close container command for the given container to the given
* datanode.
*
* @param container Container to be closed
* @param datanode The datanode on which the container
* has to be closed
* @param force Should be set to true if we want to close a
* QUASI_CLOSED container
*/
private void sendCloseCommand(final ContainerInfo container,
final DatanodeDetails datanode,
final boolean force) {
LOG.info("Sending close container command for container {}" +
" to datanode {}.", container.containerID(), datanode);
CloseContainerCommand closeContainerCommand =
new CloseContainerCommand(container.getContainerID(),
container.getPipelineID(), force);
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(datanode.getUuid(), closeContainerCommand));
}
/**
* Sends replicate container command for the given container to the given
* datanode.
*
* @param container Container to be replicated
* @param datanode The destination datanode to replicate
* @param sources List of source nodes from where we can replicate
*/
private void sendReplicateCommand(final ContainerInfo container,
final DatanodeDetails datanode,
final List<DatanodeDetails> sources) {
LOG.info("Sending replicate container command for container {}" +
" to datanode {}", container.containerID(), datanode);
final ContainerID id = container.containerID();
final ReplicateContainerCommand replicateCommand =
new ReplicateContainerCommand(id.getId(), sources);
inflightReplication.computeIfAbsent(id, k -> new ArrayList<>());
sendAndTrackDatanodeCommand(datanode, replicateCommand,
action -> inflightReplication.get(id).add(action));
}
/**
* Sends delete container command for the given container to the given
* datanode.
*
* @param container Container to be deleted
* @param datanode The datanode on which the replica should be deleted
* @param force Should be set to true to delete an OPEN replica
*/
private void sendDeleteCommand(final ContainerInfo container,
final DatanodeDetails datanode,
final boolean force) {
LOG.info("Sending delete container command for container {}" +
" to datanode {}", container.containerID(), datanode);
final ContainerID id = container.containerID();
final DeleteContainerCommand deleteCommand =
new DeleteContainerCommand(id.getId(), force);
inflightDeletion.computeIfAbsent(id, k -> new ArrayList<>());
sendAndTrackDatanodeCommand(datanode, deleteCommand,
action -> inflightDeletion.get(id).add(action));
}
/**
* Creates CommandForDatanode with the given SCMCommand and fires
* DATANODE_COMMAND event to event queue.
*
* Tracks the command using the given tracker.
*
* @param datanode Datanode to which the command has to be sent
* @param command SCMCommand to be sent
* @param tracker Tracker which tracks the inflight actions
* @param <T> Type of SCMCommand
*/
private <T extends GeneratedMessage> void sendAndTrackDatanodeCommand(
final DatanodeDetails datanode,
final SCMCommand<T> command,
final Consumer<InflightAction> tracker) {
final CommandForDatanode<T> datanodeCommand =
new CommandForDatanode<>(datanode.getUuid(), command);
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
tracker.accept(new InflightAction(datanode, Time.monotonicNow()));
}
/**
* Compares the container state with the replica state.
*
* @param containerState ContainerState
* @param replicaState ReplicaState
* @return true if the state matches, false otherwise
*/
private static boolean compareState(final LifeCycleState containerState,
final State replicaState) {
switch (containerState) {
case OPEN:
return replicaState == State.OPEN;
case CLOSING:
return replicaState == State.CLOSING;
case QUASI_CLOSED:
return replicaState == State.QUASI_CLOSED;
case CLOSED:
return replicaState == State.CLOSED;
case DELETING:
return false;
case DELETED:
return false;
default:
return false;
}
}
/**
* Wrapper class to hold the InflightAction with its start time.
*/
private static final class InflightAction {
private final DatanodeDetails datanode;
private final long time;
private InflightAction(final DatanodeDetails datanode,
final long time) {
this.datanode = datanode;
this.time = time;
}
}
}

View File

@ -131,6 +131,16 @@ public ContainerStateManager getContainerStateManager() {
return containerStateManager; return containerStateManager;
} }
@Override
public Set<ContainerID> getContainerIDs() {
lock.lock();
try {
return containerStateManager.getAllContainerIDs();
} finally {
lock.unlock();
}
}
@Override @Override
public List<ContainerInfo> getContainers() { public List<ContainerInfo> getContainers() {
lock.lock(); lock.lock();

View File

@ -35,7 +35,6 @@
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.TreeSet; import java.util.TreeSet;
@ -107,9 +106,9 @@ public ContainerStateMap() {
this.ownerMap = new ContainerAttribute<>(); this.ownerMap = new ContainerAttribute<>();
this.factorMap = new ContainerAttribute<>(); this.factorMap = new ContainerAttribute<>();
this.typeMap = new ContainerAttribute<>(); this.typeMap = new ContainerAttribute<>();
this.containerMap = new HashMap<>(); this.containerMap = new ConcurrentHashMap<>();
this.lock = new ReentrantReadWriteLock(); this.lock = new ReentrantReadWriteLock();
this.replicaMap = new HashMap<>(); this.replicaMap = new ConcurrentHashMap<>();
this.resultCache = new ConcurrentHashMap<>(); this.resultCache = new ConcurrentHashMap<>();
} }
@ -208,7 +207,7 @@ public Set<ContainerReplica> getContainerReplicas(
try { try {
checkIfContainerExist(containerID); checkIfContainerExist(containerID);
return Collections return Collections
.unmodifiableSet(new HashSet<>(replicaMap.get(containerID))); .unmodifiableSet(replicaMap.get(containerID));
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
} }
@ -342,7 +341,7 @@ public void updateState(ContainerID containerID, LifeCycleState currentState,
} }
public Set<ContainerID> getAllContainerIDs() { public Set<ContainerID> getAllContainerIDs() {
return containerMap.keySet(); return Collections.unmodifiableSet(containerMap.keySet());
} }
/** /**

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm; package org.apache.hadoop.hdds.scm;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineAction; .StorageContainerDatanodeProtocolProtos.PipelineAction;
@ -30,6 +31,8 @@
.StorageContainerDatanodeProtocolProtos.PipelineReport; .StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.server.SCMConfigurator; import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server import org.apache.hadoop.hdds.scm.server
@ -66,7 +69,9 @@
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
@ -98,7 +103,7 @@ public static DatanodeDetails randomDatanodeDetails() {
* *
* @return DatanodeDetails * @return DatanodeDetails
*/ */
private static DatanodeDetails createDatanodeDetails(UUID uuid) { public static DatanodeDetails createDatanodeDetails(UUID uuid) {
String ipAddress = random.nextInt(256) String ipAddress = random.nextInt(256)
+ "." + random.nextInt(256) + "." + random.nextInt(256)
+ "." + random.nextInt(256) + "." + random.nextInt(256)
@ -521,4 +526,50 @@ public static StorageContainerManager getScm(OzoneConfiguration conf,
return new StorageContainerManager(conf, configurator); return new StorageContainerManager(conf, configurator);
} }
public static ContainerInfo getContainer(
final HddsProtos.LifeCycleState state) {
return new ContainerInfo.Builder()
.setContainerID(RandomUtils.nextLong())
.setReplicationType(HddsProtos.ReplicationType.RATIS)
.setReplicationFactor(HddsProtos.ReplicationFactor.THREE)
.setState(state)
.setOwner("TEST")
.build();
}
public static Set<ContainerReplica> getReplicas(
final ContainerID containerId,
final ContainerReplicaProto.State state,
final DatanodeDetails... datanodeDetails) {
return getReplicas(containerId, state, 10000L, datanodeDetails);
}
public static Set<ContainerReplica> getReplicas(
final ContainerID containerId,
final ContainerReplicaProto.State state,
final long sequenceId,
final DatanodeDetails... datanodeDetails) {
Set<ContainerReplica> replicas = new HashSet<>();
for (DatanodeDetails datanode : datanodeDetails) {
replicas.add(getReplicas(containerId, state,
sequenceId, datanode.getUuid(), datanode));
}
return replicas;
}
public static ContainerReplica getReplicas(
final ContainerID containerId,
final ContainerReplicaProto.State state,
final long sequenceId,
final UUID originNodeId,
final DatanodeDetails datanodeDetails) {
return ContainerReplica.newBuilder()
.setContainerID(containerId)
.setContainerState(state)
.setDatanodeDetails(datanodeDetails)
.setOriginNodeId(originNodeId)
.setSequenceId(sequenceId)
.build();
}
} }

View File

@ -47,10 +47,10 @@
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.apache.hadoop.hdds.scm.container import static org.apache.hadoop.hdds.scm.TestUtils
.TestContainerReportHelper.getReplicas; .getReplicas;
import static org.apache.hadoop.hdds.scm.container import static org.apache.hadoop.hdds.scm.TestUtils
.TestContainerReportHelper.getContainer; .getContainer;
import static org.apache.hadoop.hdds.scm.container import static org.apache.hadoop.hdds.scm.container
.TestContainerReportHelper.addContainerToContainerManager; .TestContainerReportHelper.addContainerToContainerManager;
import static org.apache.hadoop.hdds.scm.container import static org.apache.hadoop.hdds.scm.container

View File

@ -16,19 +16,12 @@
*/ */
package org.apache.hadoop.hdds.scm.container; package org.apache.hadoop.hdds.scm.container;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet;
import java.util.Set; import java.util.Set;
/** /**
@ -77,37 +70,4 @@ static void mockUpdateContainerState(
containerInfo.containerID(), event); containerInfo.containerID(), event);
} }
public static ContainerInfo getContainer(final LifeCycleState state) {
return new ContainerInfo.Builder()
.setContainerID(RandomUtils.nextLong())
.setReplicationType(ReplicationType.RATIS)
.setReplicationFactor(ReplicationFactor.THREE)
.setState(state)
.build();
}
static Set<ContainerReplica> getReplicas(
final ContainerID containerId,
final ContainerReplicaProto.State state,
final DatanodeDetails... datanodeDetails) {
return getReplicas(containerId, state, 10000L, datanodeDetails);
}
static Set<ContainerReplica> getReplicas(
final ContainerID containerId,
final ContainerReplicaProto.State state,
final long sequenceId,
final DatanodeDetails... datanodeDetails) {
Set<ContainerReplica> replicas = new HashSet<>();
for (DatanodeDetails datanode : datanodeDetails) {
replicas.add(ContainerReplica.newBuilder()
.setContainerID(containerId)
.setContainerState(state)
.setDatanodeDetails(datanode)
.setOriginNodeId(datanode.getUuid())
.setSequenceId(sequenceId)
.build());
}
return replicas;
}
} }

View File

@ -38,10 +38,8 @@
import static org.apache.hadoop.hdds.scm.container import static org.apache.hadoop.hdds.scm.container
.TestContainerReportHelper.addContainerToContainerManager; .TestContainerReportHelper.addContainerToContainerManager;
import static org.apache.hadoop.hdds.scm.container import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
.TestContainerReportHelper.getContainer; import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
import static org.apache.hadoop.hdds.scm.container
.TestContainerReportHelper.getReplicas;
import static org.apache.hadoop.hdds.scm.container import static org.apache.hadoop.hdds.scm.container
.TestContainerReportHelper.mockUpdateContainerReplica; .TestContainerReportHelper.mockUpdateContainerReplica;
import static org.apache.hadoop.hdds.scm.container import static org.apache.hadoop.hdds.scm.container

View File

@ -0,0 +1,625 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.hadoop.hdds.scm.TestUtils.createDatanodeDetails;
import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
import static org.apache.hadoop.hdds.scm.TestUtils.randomDatanodeDetails;
/**
* Test cases to verify the functionality of ReplicationManager.
*/
public class TestReplicationManager {
private ReplicationManager replicationManager;
private ContainerStateManager containerStateManager;
private ContainerPlacementPolicy containerPlacementPolicy;
private EventQueue eventQueue;
private DatanodeCommandHandler datanodeCommandHandler;
@Before
public void setup() throws IOException, InterruptedException {
final Configuration conf = new OzoneConfiguration();
final ContainerManager containerManager =
Mockito.mock(ContainerManager.class);
eventQueue = new EventQueue();
containerStateManager = new ContainerStateManager(conf);
datanodeCommandHandler = new DatanodeCommandHandler();
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, datanodeCommandHandler);
Mockito.when(containerManager.getContainerIDs())
.thenAnswer(invocation -> containerStateManager.getAllContainerIDs());
Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
.getContainer((ContainerID)invocation.getArguments()[0]));
Mockito.when(containerManager.getContainerReplicas(
Mockito.any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
.getContainerReplicas((ContainerID)invocation.getArguments()[0]));
containerPlacementPolicy = Mockito.mock(ContainerPlacementPolicy.class);
Mockito.when(containerPlacementPolicy.chooseDatanodes(
Mockito.anyListOf(DatanodeDetails.class),
Mockito.anyInt(), Mockito.anyLong()))
.thenAnswer(invocation -> {
int count = (int) invocation.getArguments()[1];
return IntStream.range(0, count)
.mapToObj(i -> randomDatanodeDetails())
.collect(Collectors.toList());
});
replicationManager = new ReplicationManager(
conf, containerManager, containerPlacementPolicy, eventQueue);
replicationManager.start();
Thread.sleep(100L);
}
/**
* Open containers are not handled by ReplicationManager.
* This test-case makes sure that ReplicationManages doesn't take
* any action on OPEN containers.
*/
@Test
public void testOpenContainer() throws SCMException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.OPEN);
containerStateManager.loadContainer(container);
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
}
/**
* If the container is in CLOSING state we resend close container command
* to all the datanodes.
*/
@Test
public void testClosingContainer() throws
SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
final ContainerID id = container.containerID();
containerStateManager.loadContainer(container);
// Two replicas in CLOSING state
final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSING,
randomDatanodeDetails(),
randomDatanodeDetails());
// One replica in OPEN state
final DatanodeDetails datanode = randomDatanodeDetails();
replicas.addAll(getReplicas(id, State.OPEN, datanode));
for (ContainerReplica replica : replicas) {
containerStateManager.updateContainerReplica(id, replica);
}
final int currentCloseCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
// Update the OPEN to CLOSING
for (ContainerReplica replica : getReplicas(id, State.CLOSING, datanode)) {
containerStateManager.updateContainerReplica(id, replica);
}
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentCloseCommandCount + 6, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
}
/**
* The container is QUASI_CLOSED but two of the replica is still in
* open state. ReplicationManager should resend close command to those
* datanodes.
*/
@Test
public void testQuasiClosedContainerWithTwoOpenReplica() throws
SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.OPEN, 1000L, originNodeId, randomDatanodeDetails());
final DatanodeDetails datanodeDetails = randomDatanodeDetails();
final ContainerReplica replicaThree = getReplicas(
id, State.OPEN, 1000L, datanodeDetails.getUuid(), datanodeDetails);
containerStateManager.loadContainer(container);
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(id, replicaThree);
final int currentCloseCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
// Two of the replicas are in OPEN state
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentCloseCommandCount + 2, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
Assert.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.closeContainerCommand,
replicaTwo.getDatanodeDetails()));
Assert.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.closeContainerCommand,
replicaThree.getDatanodeDetails()));
}
/**
* When the container is in QUASI_CLOSED state and all the replicas are
* also in QUASI_CLOSED state and doesn't have a quorum to force close
* the container, ReplicationManager will not do anything.
*/
@Test
public void testHealthyQuasiClosedContainer() throws
SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.loadContainer(container);
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(id, replicaThree);
// All the QUASI_CLOSED replicas have same originNodeId, so the
// container will not be closed. ReplicationManager should take no action.
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
}
/**
* When a container is QUASI_CLOSED and we don't have quorum to force close
* the container, the container should have all the replicas in QUASI_CLOSED
* state, else ReplicationManager will take action.
*
* In this test case we make one of the replica unhealthy, replication manager
* will send delete container command to the datanode which has the unhealthy
* replica.
*/
@Test
public void testQuasiClosedContainerWithUnhealthyReplica()
throws SCMException, ContainerNotFoundException, InterruptedException,
ContainerReplicaNotFoundException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.loadContainer(container);
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(id, replicaThree);
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
// All the QUASI_CLOSED replicas have same originNodeId, so the
// container will not be closed. ReplicationManager should take no action.
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
// Make the first replica unhealthy
final ContainerReplica unhealthyReplica = getReplicas(
id, State.UNHEALTHY, 1000L, originNodeId,
replicaOne.getDatanodeDetails());
containerStateManager.updateContainerReplica(id, unhealthyReplica);
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assert.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaOne.getDatanodeDetails()));
// Now we will delete the unhealthy replica from in-memory.
containerStateManager.removeContainerReplica(id, replicaOne);
// The container is under replicated as unhealthy replica is removed
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
// We should get replicate command
Assert.assertEquals(currentReplicateCommandCount + 1,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
}
/**
* When a QUASI_CLOSED container is over replicated, ReplicationManager
* deletes the excess replicas.
*/
@Test
public void testOverReplicatedQuasiClosedContainer() throws
SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaFour = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.loadContainer(container);
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(id, replicaThree);
containerStateManager.updateContainerReplica(id, replicaFour);
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
}
/**
* When a QUASI_CLOSED container is over replicated, ReplicationManager
* deletes the excess replicas. While choosing the replica for deletion
* ReplicationManager should prioritize unhealthy replica over QUASI_CLOSED
* replica.
*/
@Test
public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica()
throws SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaFour = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.loadContainer(container);
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(id, replicaThree);
containerStateManager.updateContainerReplica(id, replicaFour);
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assert.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaOne.getDatanodeDetails()));
}
/**
* ReplicationManager should replicate an QUASI_CLOSED replica if it is
* under replicated.
*/
@Test
public void testUnderReplicatedQuasiClosedContainer() throws
SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.loadContainer(container);
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentReplicateCommandCount + 1,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
}
/**
* When a QUASI_CLOSED container is under replicated, ReplicationManager
* should re-replicate it. If there are any unhealthy replica, it has to
* be deleted.
*
* In this test case, the container is QUASI_CLOSED and is under replicated
* and also has an unhealthy replica.
*
* In the first iteration of ReplicationManager, it should re-replicate
* the container so that it has enough replicas.
*
* In the second iteration, ReplicationManager should delete the unhealthy
* replica.
*
* In the third iteration, ReplicationManager will re-replicate as the
* container has again become under replicated after the unhealthy
* replica has been deleted.
*
*/
@Test
public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
throws SCMException, ContainerNotFoundException, InterruptedException,
ContainerReplicaNotFoundException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.loadContainer(container);
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentReplicateCommandCount + 1,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
Optional<CommandForDatanode> replicateCommand = datanodeCommandHandler
.getReceivedCommands().stream()
.filter(c -> c.getCommand().getType()
.equals(SCMCommandProto.Type.replicateContainerCommand))
.findFirst();
Assert.assertTrue(replicateCommand.isPresent());
DatanodeDetails newNode = createDatanodeDetails(
replicateCommand.get().getDatanodeId());
ContainerReplica newReplica = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, newNode);
containerStateManager.updateContainerReplica(id, newReplica);
/*
* We have report the replica to SCM, in the next ReplicationManager
* iteration it should delete the unhealthy replica.
*/
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
// ReplicaTwo should be deleted, that is the unhealthy one
Assert.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaTwo.getDatanodeDetails()));
containerStateManager.removeContainerReplica(id, replicaTwo);
/*
* We have now removed unhealthy replica, next iteration of
* ReplicationManager should re-replicate the container as it
* is under replicated now
*/
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentReplicateCommandCount + 2,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
}
/**
* When a container is QUASI_CLOSED and it has >50% of its replica
* in QUASI_CLOSED state with unique origin node id,
* ReplicationManager should force close the replica(s) with
* highest BCSID.
*/
@Test
public void testQuasiClosedToClosed() throws
SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final Set<ContainerReplica> replicas = getReplicas(id, State.QUASI_CLOSED,
randomDatanodeDetails(),
randomDatanodeDetails(),
randomDatanodeDetails());
containerStateManager.loadContainer(container);
for (ContainerReplica replica : replicas) {
containerStateManager.updateContainerReplica(id, replica);
}
final int currentCloseCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
// All the replicas have same BCSID, so all of them will be closed.
Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
}
/**
* ReplicationManager should not take any action if the container is
* CLOSED and healthy.
*/
@Test
public void testHealthyClosedContainer()
throws SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
final ContainerID id = container.containerID();
final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSED,
randomDatanodeDetails(),
randomDatanodeDetails(),
randomDatanodeDetails());
containerStateManager.loadContainer(container);
for (ContainerReplica replica : replicas) {
containerStateManager.updateContainerReplica(id, replica);
}
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
}
@After
public void teardown() throws IOException {
containerStateManager.close();
replicationManager.stop();
}
private class DatanodeCommandHandler implements
EventHandler<CommandForDatanode> {
private AtomicInteger invocation = new AtomicInteger(0);
private Map<SCMCommandProto.Type, AtomicInteger> commandInvocation =
new HashMap<>();
private List<CommandForDatanode> commands = new ArrayList<>();
@Override
public void onMessage(final CommandForDatanode command,
final EventPublisher publisher) {
final SCMCommandProto.Type type = command.getCommand().getType();
commandInvocation.computeIfAbsent(type, k -> new AtomicInteger(0));
commandInvocation.get(type).incrementAndGet();
invocation.incrementAndGet();
commands.add(command);
}
private int getInvocation() {
return invocation.get();
}
private int getInvocationCount(SCMCommandProto.Type type) {
return commandInvocation.containsKey(type) ?
commandInvocation.get(type).get() : 0;
}
private List<CommandForDatanode> getReceivedCommands() {
return commands;
}
/**
* Returns true if the command handler has received the given
* command type for the provided datanode.
*
* @param type Command Type
* @param datanode DatanodeDetails
* @return True if command was received, false otherwise
*/
private boolean received(final SCMCommandProto.Type type,
final DatanodeDetails datanode) {
return commands.stream().anyMatch(dc ->
dc.getCommand().getType().equals(type) &&
dc.getDatanodeId().equals(datanode.getUuid()));
}
}
}