HDDS-1368. Cleanup old ReplicationManager code from SCM.

This commit is contained in:
Nanda kumar 2019-04-23 17:35:39 +05:30
parent 8a95ea61e1
commit 7e1f8d3a1b
No known key found for this signature in database
GPG Key ID: CE6C8AB1204780DF
20 changed files with 148 additions and 1424 deletions

View File

@ -19,9 +19,10 @@ package org.apache.hadoop.ozone.container.common.statemachine;
import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.protocol.proto
@ -34,8 +35,6 @@ import org.apache.hadoop.ozone.container.common.states.datanode
import org.apache.hadoop.ozone.container.common.states.datanode
.RunningDatanodeState;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus
.CommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands
.DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@ -432,27 +431,14 @@ public class StateContext {
* @param cmd - {@link SCMCommand}.
*/
public void addCmdStatus(SCMCommand cmd) {
final Optional<CommandStatusBuilder> cmdStatusBuilder;
switch (cmd.getType()) {
case replicateContainerCommand:
cmdStatusBuilder = Optional.of(CommandStatusBuilder.newBuilder());
break;
case deleteBlocksCommand:
cmdStatusBuilder = Optional.of(
DeleteBlockCommandStatusBuilder.newBuilder());
break;
case deleteContainerCommand:
cmdStatusBuilder = Optional.of(CommandStatusBuilder.newBuilder());
break;
default:
cmdStatusBuilder = Optional.empty();
if (cmd.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
addCmdStatus(cmd.getId(),
DeleteBlockCommandStatusBuilder.newBuilder()
.setCmdId(cmd.getId())
.setStatus(Status.PENDING)
.setType(cmd.getType())
.build());
}
cmdStatusBuilder.ifPresent(statusBuilder ->
addCmdStatus(cmd.getId(), statusBuilder
.setCmdId(cmd.getId())
.setStatus(Status.PENDING)
.setType(cmd.getType())
.build()));
}
/**

View File

@ -56,11 +56,7 @@ public class DeleteContainerCommandHandler implements CommandHandler {
final ContainerController controller = ozoneContainer.getController();
controller.deleteContainer(deleteContainerCommand.getContainerID(),
deleteContainerCommand.isForce());
updateCommandStatus(context, command,
(cmdStatus) -> cmdStatus.setStatus(true), LOG);
} catch (IOException e) {
updateCommandStatus(context, command,
(cmdStatus) -> cmdStatus.setStatus(false), LOG);
LOG.error("Exception occurred while deleting the container.", e);
} finally {
totalTime += Time.monotonicNow() - startTime;

View File

@ -61,25 +61,17 @@ public class ReplicateContainerCommandHandler implements CommandHandler {
public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
ReplicateContainerCommand replicateCommand =
final ReplicateContainerCommand replicateCommand =
(ReplicateContainerCommand) command;
try {
List<DatanodeDetails> sourceDatanodes =
replicateCommand.getSourceDatanodes();
long containerID = replicateCommand.getContainerID();
final List<DatanodeDetails> sourceDatanodes =
replicateCommand.getSourceDatanodes();
final long containerID = replicateCommand.getContainerID();
Preconditions.checkArgument(sourceDatanodes.size() > 0,
String.format("Replication command is received for container %d "
+ "but the size of source datanodes was 0.", containerID));
Preconditions.checkArgument(sourceDatanodes.size() > 0,
String.format("Replication command is received for container %d "
+ "but the size of source datanodes was 0.", containerID));
ReplicationTask replicationTask =
new ReplicationTask(containerID, sourceDatanodes);
supervisor.addTask(replicationTask);
} finally {
updateCommandStatus(context, command,
(cmdStatus) -> cmdStatus.setStatus(true), LOG);
}
supervisor.addTask(new ReplicationTask(containerID, sourceDatanodes));
}
@Override

View File

@ -18,9 +18,10 @@
package org.apache.hadoop.hdds.scm.command;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CommandStatus;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.CommandStatusReportFromDatanode;
@ -54,32 +55,14 @@ public class CommandStatusReportHandler implements
cmdStatusList.forEach(cmdStatus -> {
LOGGER.trace("Emitting command status for id:{} type: {}", cmdStatus
.getCmdId(), cmdStatus.getType());
switch (cmdStatus.getType()) {
case replicateContainerCommand:
publisher.fireEvent(SCMEvents.REPLICATION_STATUS, new
ReplicationStatus(cmdStatus));
if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
publisher.fireEvent(SCMEvents.REPLICATION_COMPLETE,
new ReplicationManager.ReplicationCompleted(
cmdStatus.getCmdId()));
}
break;
case deleteBlocksCommand:
if (cmdStatus.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS,
new DeleteBlockStatus(cmdStatus));
}
break;
case deleteContainerCommand:
if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
publisher.fireEvent(SCMEvents.DELETE_CONTAINER_COMMAND_COMPLETE,
new ReplicationManager.DeleteContainerCommandCompleted(
cmdStatus.getCmdId()));
}
default:
} else {
LOGGER.debug("CommandStatus of type:{} not handled in " +
"CommandStatusReportHandler.", cmdStatus.getType());
break;
}
});
}
@ -109,24 +92,6 @@ public class CommandStatusReportHandler implements
}
}
/**
* Wrapper event for Replicate Command.
*/
public static class ReplicationStatus extends CommandStatusEvent {
public ReplicationStatus(CommandStatus cmdStatus) {
super(cmdStatus);
}
}
/**
* Wrapper event for CloseContainer Command.
*/
public static class CloseContainerStatus extends CommandStatusEvent {
public CloseContainerStatus(CommandStatus cmdStatus) {
super(cmdStatus);
}
}
/**
* Wrapper event for DeleteBlock Command.
*/

View File

@ -1,56 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
.DeletionRequestToRepeat;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
.DeleteContainerCommandCompleted;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventWatcher;
import org.apache.hadoop.ozone.lease.LeaseManager;
/**
* Command watcher to track the delete container commands.
*/
public class DeleteContainerCommandWatcher extends
EventWatcher<DeletionRequestToRepeat, DeleteContainerCommandCompleted> {
public DeleteContainerCommandWatcher(
Event<DeletionRequestToRepeat> startEvent,
Event<DeleteContainerCommandCompleted> completionEvent,
LeaseManager<Long> leaseManager) {
super(startEvent, completionEvent, leaseManager);
}
@Override
protected void onTimeout(EventPublisher publisher,
DeletionRequestToRepeat payload) {
//put back to the original queue
publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
payload.getRequest());
}
@Override
protected void onFinished(EventPublisher publisher,
DeletionRequestToRepeat payload) {
}
}

View File

@ -1,56 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.container.replication;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
.ReplicationCompleted;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
.ReplicationRequestToRepeat;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventWatcher;
import org.apache.hadoop.ozone.lease.LeaseManager;
/**
* Command watcher to track the replication commands.
*/
public class ReplicationCommandWatcher
extends
EventWatcher<ReplicationManager.ReplicationRequestToRepeat,
ReplicationManager.ReplicationCompleted> {
public ReplicationCommandWatcher(Event<ReplicationRequestToRepeat> startEvent,
Event<ReplicationCompleted> completionEvent,
LeaseManager<Long> leaseManager) {
super(startEvent, completionEvent, leaseManager);
}
@Override
protected void onTimeout(EventPublisher publisher,
ReplicationRequestToRepeat payload) {
//put back to the original queue
publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
payload.getRequest());
}
@Override
protected void onFinished(EventPublisher publisher,
ReplicationRequestToRepeat payload) {
}
}

View File

@ -1,384 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.container.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.DeleteContainerCommandWatcher;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import static org.apache.hadoop.hdds.scm.events.SCMEvents
.TRACK_DELETE_CONTAINER_COMMAND;
import static org.apache.hadoop.hdds.scm.events.SCMEvents
.TRACK_REPLICATE_COMMAND;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Replication Manager manages the replication of the closed container.
*/
public class ReplicationManager implements Runnable {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationManager.class);
private ReplicationQueue replicationQueue;
private ContainerPlacementPolicy containerPlacement;
private EventPublisher eventPublisher;
private ReplicationCommandWatcher replicationCommandWatcher;
private DeleteContainerCommandWatcher deleteContainerCommandWatcher;
private boolean running = true;
private ContainerManager containerManager;
public ReplicationManager(ContainerPlacementPolicy containerPlacement,
ContainerManager containerManager, EventQueue eventQueue,
LeaseManager<Long> commandWatcherLeaseManager) {
this.containerPlacement = containerPlacement;
this.containerManager = containerManager;
this.eventPublisher = eventQueue;
this.replicationCommandWatcher =
new ReplicationCommandWatcher(TRACK_REPLICATE_COMMAND,
SCMEvents.REPLICATION_COMPLETE, commandWatcherLeaseManager);
this.deleteContainerCommandWatcher =
new DeleteContainerCommandWatcher(TRACK_DELETE_CONTAINER_COMMAND,
SCMEvents.DELETE_CONTAINER_COMMAND_COMPLETE,
commandWatcherLeaseManager);
this.replicationQueue = new ReplicationQueue();
eventQueue.addHandler(SCMEvents.REPLICATE_CONTAINER,
(replicationRequest, publisher) -> replicationQueue
.add(replicationRequest));
this.replicationCommandWatcher.start(eventQueue);
}
public void start() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Replication Manager").build();
threadFactory.newThread(this).start();
}
@Override
public void run() {
while (running) {
ReplicationRequest request = null;
try {
//TODO: add throttling here
request = replicationQueue.take();
ContainerID containerID = new ContainerID(request.getContainerId());
ContainerInfo container = containerManager.getContainer(containerID);
final HddsProtos.LifeCycleState state = container.getState();
if (state != LifeCycleState.CLOSED &&
state != LifeCycleState.QUASI_CLOSED) {
LOG.warn("Cannot replicate the container {} when in {} state.",
containerID, state);
continue;
}
//check the current replication
List<ContainerReplica> containerReplicas =
new ArrayList<>(getCurrentReplicas(request));
if (containerReplicas.size() == 0) {
LOG.warn(
"Container {} should be replicated but can't find any existing "
+ "replicas",
containerID);
return;
}
final ReplicationRequest finalRequest = request;
int inFlightReplications = replicationCommandWatcher.getTimeoutEvents(
e -> e.getRequest().getContainerId()
== finalRequest.getContainerId())
.size();
int inFlightDelete = deleteContainerCommandWatcher.getTimeoutEvents(
e -> e.getRequest().getContainerId()
== finalRequest.getContainerId())
.size();
int deficit =
(request.getExpecReplicationCount() - containerReplicas.size())
- (inFlightReplications - inFlightDelete);
if (deficit > 0) {
List<DatanodeDetails> datanodes = containerReplicas.stream()
.sorted((r1, r2) ->
r2.getSequenceId().compareTo(r1.getSequenceId()))
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
List<DatanodeDetails> selectedDatanodes = containerPlacement
.chooseDatanodes(datanodes, deficit, container.getUsedBytes());
//send the command
for (DatanodeDetails datanode : selectedDatanodes) {
LOG.info("Container {} is under replicated." +
" Expected replica count is {}, but found {}." +
" Re-replicating it on {}.",
container.containerID(), request.getExpecReplicationCount(),
containerReplicas.size(), datanode);
ReplicateContainerCommand replicateCommand =
new ReplicateContainerCommand(containerID.getId(), datanodes);
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(
datanode.getUuid(), replicateCommand));
ReplicationRequestToRepeat timeoutEvent =
new ReplicationRequestToRepeat(replicateCommand.getId(),
request);
eventPublisher.fireEvent(TRACK_REPLICATE_COMMAND, timeoutEvent);
}
} else if (deficit < 0) {
int numberOfReplicasToDelete = Math.abs(deficit);
final Map<UUID, List<DatanodeDetails>> originIdToDnMap =
new LinkedHashMap<>();
containerReplicas.stream()
.sorted(Comparator.comparing(ContainerReplica::getSequenceId))
.forEach(replica -> {
originIdToDnMap.computeIfAbsent(
replica.getOriginDatanodeId(), key -> new ArrayList<>());
originIdToDnMap.get(replica.getOriginDatanodeId())
.add(replica.getDatanodeDetails());
});
for (List<DatanodeDetails> listOfReplica : originIdToDnMap.values()) {
if (listOfReplica.size() > 1) {
final int toDelete = Math.min(listOfReplica.size() - 1,
numberOfReplicasToDelete);
final DeleteContainerCommand deleteContainer =
new DeleteContainerCommand(containerID.getId(), true);
for (int i = 0; i < toDelete; i++) {
LOG.info("Container {} is over replicated." +
" Expected replica count is {}, but found {}." +
" Deleting the replica on {}.",
container.containerID(), request.getExpecReplicationCount(),
containerReplicas.size(), listOfReplica.get(i));
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(listOfReplica.get(i).getUuid(),
deleteContainer));
DeletionRequestToRepeat timeoutEvent =
new DeletionRequestToRepeat(deleteContainer.getId(),
request);
eventPublisher.fireEvent(
TRACK_DELETE_CONTAINER_COMMAND, timeoutEvent);
}
numberOfReplicasToDelete -= toDelete;
}
if (numberOfReplicasToDelete == 0) {
break;
}
}
if (numberOfReplicasToDelete != 0) {
final int expectedReplicaCount = container
.getReplicationFactor().getNumber();
LOG.warn("Not able to delete the container replica of Container" +
" {} even though it is over replicated. Expected replica" +
" count is {}, current replica count is {}.",
containerID, expectedReplicaCount,
expectedReplicaCount + numberOfReplicasToDelete);
}
}
} catch (Exception e) {
LOG.error("Can't replicate container {}", request, e);
}
}
}
@VisibleForTesting
protected Set<ContainerReplica> getCurrentReplicas(ReplicationRequest request)
throws IOException {
return containerManager
.getContainerReplicas(new ContainerID(request.getContainerId()));
}
@VisibleForTesting
public ReplicationQueue getReplicationQueue() {
return replicationQueue;
}
public void stop() {
running = false;
}
/**
* Event for the ReplicationCommandWatcher to repeat the embedded request.
* in case fof timeout.
*/
public static class ReplicationRequestToRepeat
extends ContainerRequestToRepeat {
public ReplicationRequestToRepeat(
long commandId, ReplicationRequest request) {
super(commandId, request);
}
}
/**
* Event for the DeleteContainerCommandWatcher to repeat the
* embedded request. In case fof timeout.
*/
public static class DeletionRequestToRepeat
extends ContainerRequestToRepeat {
public DeletionRequestToRepeat(
long commandId, ReplicationRequest request) {
super(commandId, request);
}
}
/**
* Container Request wrapper which will be used by ReplicationManager to
* perform the intended operation.
*/
public static class ContainerRequestToRepeat
implements IdentifiableEventPayload {
private final long commandId;
private final ReplicationRequest request;
ContainerRequestToRepeat(long commandId,
ReplicationRequest request) {
this.commandId = commandId;
this.request = request;
}
public ReplicationRequest getRequest() {
return request;
}
@Override
public long getId() {
return commandId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ContainerRequestToRepeat that = (ContainerRequestToRepeat) o;
return Objects.equals(request, that.request);
}
@Override
public int hashCode() {
return Objects.hash(request);
}
}
/**
* Event which indicates that the replicate operation is completed.
*/
public static class ReplicationCompleted
implements IdentifiableEventPayload {
private final long uuid;
public ReplicationCompleted(long uuid) {
this.uuid = uuid;
}
@Override
public long getId() {
return uuid;
}
}
/**
* Event which indicates that the container deletion operation is completed.
*/
public static class DeleteContainerCommandCompleted
implements IdentifiableEventPayload {
private final long uuid;
public DeleteContainerCommandCompleted(long uuid) {
this.uuid = uuid;
}
@Override
public long getId() {
return uuid;
}
}
}

View File

@ -1,73 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container.replication;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
/**
* Priority queue to handle under-replicated and over replicated containers
* in ozone. ReplicationManager will consume these messages and decide
* accordingly.
*/
public class ReplicationQueue {
private final BlockingQueue<ReplicationRequest> queue;
public ReplicationQueue() {
queue = new PriorityBlockingQueue<>();
}
public boolean add(ReplicationRequest repObj) {
if (this.queue.contains(repObj)) {
// Remove the earlier message and insert this one
this.queue.remove(repObj);
}
return this.queue.add(repObj);
}
public boolean remove(ReplicationRequest repObj) {
return queue.remove(repObj);
}
/**
* Retrieves, but does not remove, the head of this queue,
* or returns {@code null} if this queue is empty.
*
* @return the head of this queue, or {@code null} if this queue is empty
*/
public ReplicationRequest peek() {
return queue.peek();
}
/**
* Retrieves and removes the head of this queue (blocking queue).
*/
public ReplicationRequest take() throws InterruptedException {
return queue.take();
}
public boolean removeAll(List<ReplicationRequest> repObjs) {
return queue.removeAll(repObjs);
}
public int size() {
return queue.size();
}
}

View File

@ -1,123 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container.replication;
import java.io.Serializable;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
/**
* Wrapper class for hdds replication queue. Implements its natural
* ordering for priority queue.
*/
public class ReplicationRequest implements Comparable<ReplicationRequest>,
Serializable {
private final long containerId;
private final int replicationCount;
private final int expecReplicationCount;
private final long timestamp;
public ReplicationRequest(long containerId, int replicationCount,
long timestamp, int expecReplicationCount) {
this.containerId = containerId;
this.replicationCount = replicationCount;
this.timestamp = timestamp;
this.expecReplicationCount = expecReplicationCount;
}
public ReplicationRequest(long containerId, int replicationCount,
int expecReplicationCount) {
this(containerId, replicationCount, System.currentTimeMillis(),
expecReplicationCount);
}
/**
* Compares this object with the specified object for order. Returns a
* negative integer, zero, or a positive integer as this object is less
* than, equal to, or greater than the specified object.
* @param o the object to be compared.
* @return a negative integer, zero, or a positive integer as this object
* is less than, equal to, or greater than the specified object.
* @throws NullPointerException if the specified object is null
* @throws ClassCastException if the specified object's type prevents it
* from being compared to this object.
*/
@Override
public int compareTo(ReplicationRequest o) {
if (o == null) {
return 1;
}
if (this == o) {
return 0;
}
int retVal = Integer
.compare(getReplicationCount() - getExpecReplicationCount(),
o.getReplicationCount() - o.getExpecReplicationCount());
if (retVal != 0) {
return retVal;
}
return Long.compare(getTimestamp(), o.getTimestamp());
}
@Override
public int hashCode() {
return new HashCodeBuilder(91, 1011)
.append(getContainerId())
.toHashCode();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ReplicationRequest that = (ReplicationRequest) o;
return new EqualsBuilder().append(getContainerId(), that.getContainerId())
.isEquals();
}
public long getContainerId() {
return containerId;
}
public int getReplicationCount() {
return replicationCount;
}
public long getTimestamp() {
return timestamp;
}
public int getExpecReplicationCount() {
return expecReplicationCount;
}
@Override
public String toString() {
return "ReplicationRequest{" +
"containerId=" + containerId +
", replicationCount=" + replicationCount +
", expecReplicationCount=" + expecReplicationCount +
", timestamp=" + timestamp +
'}';
}
}

View File

@ -23,8 +23,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
.ReplicationStatus;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.IncrementalContainerReportFromDatanode;
@ -40,14 +38,8 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.NodeReportFromDatanode;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
.DeleteContainerCommandCompleted;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
.ReplicationCompleted;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer.NodeRegistrationContainerReport;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
.NodeRegistrationContainerReport;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@ -184,12 +176,6 @@ public final class SCMEvents {
public static final TypedEvent<DatanodeDetails> NON_HEALTHY_TO_HEALTHY_NODE =
new TypedEvent<>(DatanodeDetails.class, "NON_HEALTHY_TO_HEALTHY_NODE");
/**
* This event will be triggered by CommandStatusReportHandler whenever a
* status for Replication SCMCommand is received.
*/
public static final Event<ReplicationStatus> REPLICATION_STATUS = new
TypedEvent<>(ReplicationStatus.class, "Replicate_Command_Status");
/**
* This event will be triggered by CommandStatusReportHandler whenever a
* status for DeleteBlock SCMCommand is received.
@ -207,53 +193,6 @@ public final class SCMEvents {
public static final Event<PendingDeleteStatusList> PENDING_DELETE_STATUS =
new TypedEvent<>(PendingDeleteStatusList.class, "Pending_Delete_Status");
/**
* This is the command for ReplicationManager to handle under/over
* replication. Sent by the ContainerReportHandler after processing the
* heartbeat.
*/
public static final TypedEvent<ReplicationRequest> REPLICATE_CONTAINER =
new TypedEvent<>(ReplicationRequest.class);
/**
* This event is sent by the ReplicaManager to the
* ReplicationCommandWatcher to track the in-progress replication.
*/
public static final TypedEvent<ReplicationManager.ReplicationRequestToRepeat>
TRACK_REPLICATE_COMMAND =
new TypedEvent<>(ReplicationManager.ReplicationRequestToRepeat.class);
/**
* This event is sent by the ReplicaManager to the
* DeleteContainerCommandWatcher to track the in-progress delete commands.
*/
public static final TypedEvent<ReplicationManager.DeletionRequestToRepeat>
TRACK_DELETE_CONTAINER_COMMAND =
new TypedEvent<>(ReplicationManager.DeletionRequestToRepeat.class);
/**
* This event comes from the Heartbeat dispatcher (in fact from the
* datanode) to notify the scm that the replication is done. This is
* received by the replicate command watcher to mark in-progress task as
* finished.
<p>
* TODO: Temporary event, should be replaced by specific Heartbeat
* ActionRequred event.
*/
public static final TypedEvent<ReplicationCompleted> REPLICATION_COMPLETE =
new TypedEvent<>(ReplicationCompleted.class);
public static final TypedEvent<DeleteContainerCommandCompleted>
DELETE_CONTAINER_COMMAND_COMPLETE =
new TypedEvent<>(DeleteContainerCommandCompleted.class);
/**
* Signal for all the components (but especially for the replication
* manager and container report handler) that the replication could be
* started. Should be send only if (almost) all the container state are
* available from the datanodes.
*/
public static final TypedEvent<Boolean> START_REPLICATION =
new TypedEvent<>(Boolean.class);
public static final TypedEvent<SafeModeStatus> SAFE_MODE_STATUS =
new TypedEvent<>(SafeModeStatus.class);

View File

@ -18,121 +18,155 @@
package org.apache.hadoop.hdds.scm.node;
import java.util.Set;
import java.io.IOException;
import java.util.Optional;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerException;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER;
/**
* Handles Dead Node event.
*/
public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
private final ContainerManager containerManager;
private final NodeManager nodeManager;
private final PipelineManager pipelineManager;
private final ContainerManager containerManager;
private static final Logger LOG =
LoggerFactory.getLogger(DeadNodeHandler.class);
public DeadNodeHandler(NodeManager nodeManager,
ContainerManager containerManager) {
this.containerManager = containerManager;
public DeadNodeHandler(final NodeManager nodeManager,
final PipelineManager pipelineManager,
final ContainerManager containerManager) {
this.nodeManager = nodeManager;
this.pipelineManager = pipelineManager;
this.containerManager = containerManager;
}
@Override
public void onMessage(DatanodeDetails datanodeDetails,
EventPublisher publisher) {
public void onMessage(final DatanodeDetails datanodeDetails,
final EventPublisher publisher) {
// TODO: check if there are any pipeline on this node and fire close
// pipeline event
Set<ContainerID> ids =
null;
try {
ids = nodeManager.getContainers(datanodeDetails);
} catch (NodeNotFoundException e) {
/*
* We should have already destroyed all the pipelines on this datanode
* when it was marked as stale. Destroy pipeline should also have closed
* all the containers on this datanode.
*
* Ideally we should not have any pipeline or OPEN containers now.
*
* To be on a safer side, we double check here and take appropriate
* action.
*/
destroyPipelines(datanodeDetails);
closeContainers(datanodeDetails, publisher);
// Remove the container replicas associated with the dead node.
removeContainerReplicas(datanodeDetails);
} catch (NodeNotFoundException ex) {
// This should not happen, we cannot get a dead node event for an
// unregistered node!
// unregistered datanode!
LOG.error("DeadNode event for a unregistered node: {}!", datanodeDetails);
}
if (ids == null) {
LOG.info("There's no containers in dead datanode {}, no replica will be"
+ " removed from the in-memory state.", datanodeDetails.getUuid());
return;
}
LOG.info("Datanode {} is dead. Removing replications from the in-memory" +
" state.", datanodeDetails.getUuid());
for (ContainerID id : ids) {
try {
final ContainerInfo container = containerManager.getContainer(id);
// TODO: For open containers, trigger close on other nodes
if (!container.isOpen()) {
Set<ContainerReplica> replicas = containerManager
.getContainerReplicas(id);
replicas.stream()
.filter(r -> r.getDatanodeDetails().equals(datanodeDetails))
.findFirst()
.ifPresent(replica -> {
try {
containerManager.removeContainerReplica(id, replica);
ContainerInfo containerInfo =
containerManager.getContainer(id);
replicateIfNeeded(containerInfo, publisher);
} catch (ContainerException ex) {
LOG.warn("Exception while removing container replica #{} " +
"for container #{}.", replica, container, ex);
}
});
}
} catch (ContainerNotFoundException cnfe) {
LOG.warn("Container Not found!", cnfe);
}
}
}
/**
* Compare the existing replication number with the expected one.
* Destroys all the pipelines on the given datanode if there are any.
*
* @param datanodeDetails DatanodeDetails
*/
private void replicateIfNeeded(ContainerInfo container,
EventPublisher publisher) throws ContainerNotFoundException {
// Replicate only closed and Quasi closed containers
if (container.getState() == HddsProtos.LifeCycleState.CLOSED ||
container.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED) {
final int existingReplicas = containerManager
.getContainerReplicas(container.containerID()).size();
final int expectedReplicas = container.getReplicationFactor().getNumber();
if (existingReplicas != expectedReplicas) {
LOG.debug("Replicate Request fired for container {}, exisiting " +
"replica count {}, expected replica count {}",
container.getContainerID(), existingReplicas, expectedReplicas);
publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
new ReplicationRequest(
container.getContainerID(), existingReplicas,
expectedReplicas));
}
}
private void destroyPipelines(final DatanodeDetails datanodeDetails) {
Optional.ofNullable(nodeManager.getPipelines(datanodeDetails))
.ifPresent(pipelines ->
pipelines.forEach(id -> {
try {
pipelineManager.finalizeAndDestroyPipeline(
pipelineManager.getPipeline(id), false);
} catch (PipelineNotFoundException ignore) {
// Pipeline is not there in pipeline manager,
// should we care?
} catch (IOException ex) {
LOG.warn("Exception while finalizing pipeline {}",
id, ex);
}
}));
}
/**
* Returns logger.
* */
// TODO: remove this.
public static Logger getLogger() {
return LOG;
* Sends CloseContainerCommand to all the open containers on the
* given datanode.
*
* @param datanodeDetails DatanodeDetails
* @param publisher EventPublisher
* @throws NodeNotFoundException
*/
private void closeContainers(final DatanodeDetails datanodeDetails,
final EventPublisher publisher)
throws NodeNotFoundException {
nodeManager.getContainers(datanodeDetails)
.forEach(id -> {
try {
final ContainerInfo container = containerManager.getContainer(id);
if (container.getState() == HddsProtos.LifeCycleState.OPEN) {
publisher.fireEvent(CLOSE_CONTAINER, id);
}
} catch (ContainerNotFoundException cnfe) {
LOG.warn("Container {} is not managed by ContainerManager.",
id, cnfe);
}
});
}
/**
* Removes the ContainerReplica of the dead datanode from the containers
* which are hosted by that datanode.
*
* @param datanodeDetails DatanodeDetails
* @throws NodeNotFoundException
*/
private void removeContainerReplicas(final DatanodeDetails datanodeDetails)
throws NodeNotFoundException {
nodeManager.getContainers(datanodeDetails)
.forEach(id -> {
try {
final ContainerInfo container = containerManager.getContainer(id);
// Identify and remove the ContainerReplica of dead node
containerManager.getContainerReplicas(id)
.stream()
.filter(r -> r.getDatanodeDetails().equals(datanodeDetails))
.findFirst()
.ifPresent(replica -> {
try {
containerManager.removeContainerReplica(id, replica);
} catch (ContainerException ex) {
LOG.warn("Exception while removing container replica #{} " +
"of container {}.", replica, container, ex);
}
});
} catch (ContainerNotFoundException cnfe) {
LOG.warn("Container {} is not managed by ContainerManager.",
id, cnfe);
}
});
}
}

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdds.scm.node;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
@ -83,8 +82,7 @@ import java.util.stream.Collectors;
*/
public class SCMNodeManager implements NodeManager {
@VisibleForTesting
static final Logger LOG =
private static final Logger LOG =
LoggerFactory.getLogger(SCMNodeManager.class);
private final NodeStateManager nodeStateManager;

View File

@ -297,7 +297,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
StaleNodeHandler staleNodeHandler =
new StaleNodeHandler(scmNodeManager, pipelineManager, conf);
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
containerManager);
pipelineManager, containerManager);
NonHealthyToHealthyNodeHandler nonHealthyToHealthyNodeHandler =
new NonHealthyToHealthyNodeHandler(pipelineManager, conf);
ContainerActionsHandler actionsHandler = new ContainerActionsHandler();

View File

@ -37,8 +37,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.GenericTestUtils;
@ -57,7 +55,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.MB;
/**
* Tests for SCM Block Manager.
*/
public class TestBlockManager implements EventHandler<Boolean> {
public class TestBlockManager {
private StorageContainerManager scm;
private SCMContainerManager mapping;
private MockNodeManager nodeManager;
@ -103,7 +101,8 @@ public class TestBlockManager implements EventHandler<Boolean> {
eventQueue = new EventQueue();
eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS,
scm.getSafeModeHandler());
eventQueue.addHandler(SCMEvents.START_REPLICATION, this);
eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS,
scm.getSafeModeHandler());
CloseContainerEventHandler closeContainerHandler =
new CloseContainerEventHandler(pipelineManager, mapping);
eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
@ -282,8 +281,4 @@ public class TestBlockManager implements EventHandler<Boolean> {
Assert.assertEquals(1, pipelineManager.getPipelines(type, factor).size());
}
@Override
public void onMessage(Boolean aBoolean, EventPublisher publisher) {
System.out.println("test");
}
}

View File

@ -74,13 +74,8 @@ public class TestCommandStatusReportHandler implements EventPublisher {
cmdStatusReportHandler.onMessage(report, this);
assertTrue(logCapturer.getOutput().contains("firing event of type " +
"Delete_Block_Status"));
assertTrue(logCapturer.getOutput().contains("firing event of type " +
"Replicate_Command_Status"));
assertTrue(logCapturer.getOutput().contains("type: " +
"deleteBlocksCommand"));
assertTrue(logCapturer.getOutput().contains("type: " +
"replicateContainerCommand"));
}

View File

@ -1,290 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.container.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.replication
.ReplicationManager.ReplicationRequestToRepeat;
import org.apache.hadoop.hdds.scm.container.replication
.ReplicationManager.DeletionRequestToRepeat;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import static org.apache.hadoop.hdds.scm.events.SCMEvents
.TRACK_DELETE_CONTAINER_COMMAND;
import static org.apache.hadoop.hdds.scm.events.SCMEvents
.TRACK_REPLICATE_COMMAND;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Matchers.anyObject;
import org.mockito.Mockito;
import static org.mockito.Mockito.when;
/**
* Test behaviour of the TestReplication.
*/
public class TestReplicationManager {
private EventQueue queue;
private List<ReplicationRequestToRepeat> trackReplicationEvents;
private List<DeletionRequestToRepeat> trackDeleteEvents;
private List<CommandForDatanode<ReplicateContainerCommandProto>> copyEvents;
private ContainerManager containerManager;
private ContainerPlacementPolicy containerPlacementPolicy;
private List<DatanodeDetails> listOfDatanodeDetails;
private List<ContainerReplica> listOfContainerReplica;
private LeaseManager<Long> leaseManager;
private ReplicationManager replicationManager;
@Before
public void initReplicationManager() throws IOException {
listOfDatanodeDetails = new ArrayList<>();
listOfContainerReplica = new ArrayList<>();
IntStream.range(1, 6).forEach(i -> {
DatanodeDetails dd = TestUtils.randomDatanodeDetails();
listOfDatanodeDetails.add(dd);
listOfContainerReplica.add(ContainerReplica.newBuilder()
.setContainerID(ContainerID.valueof(i))
.setContainerState(ContainerReplicaProto.State.CLOSED)
.setSequenceId(10000L)
.setOriginNodeId(dd.getUuid())
.setDatanodeDetails(dd).build());
});
containerPlacementPolicy =
(excludedNodes, nodesRequired, sizeRequired) -> listOfDatanodeDetails
.subList(2, 2 + nodesRequired);
containerManager = Mockito.mock(ContainerManager.class);
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(LifeCycleState.CLOSED)
.build();
when(containerManager.getContainer(anyObject()))
.thenReturn(containerInfo);
when(containerManager.getContainerReplicas(new ContainerID(1L)))
.thenReturn(new HashSet<>(Arrays.asList(
listOfContainerReplica.get(0),
listOfContainerReplica.get(1)
)));
when(containerManager.getContainerReplicas(new ContainerID(3L)))
.thenReturn(new HashSet<>());
queue = new EventQueue();
trackReplicationEvents = new ArrayList<>();
queue.addHandler(TRACK_REPLICATE_COMMAND,
(event, publisher) -> trackReplicationEvents.add(event));
trackDeleteEvents = new ArrayList<>();
queue.addHandler(TRACK_DELETE_CONTAINER_COMMAND,
(event, publisher) -> trackDeleteEvents.add(event));
copyEvents = new ArrayList<>();
queue.addHandler(SCMEvents.DATANODE_COMMAND,
(event, publisher) -> copyEvents.add(event));
leaseManager = new LeaseManager<>("Test", 100000L);
replicationManager = new ReplicationManager(containerPlacementPolicy,
containerManager, queue, leaseManager);
}
/**
* Container should be replicated but no source replicas.
*/
@Test()
public void testNoExistingReplicas() throws InterruptedException {
try {
leaseManager.start();
replicationManager.start();
//WHEN
queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
new ReplicationRequest(3L, (short) 2, System.currentTimeMillis(),
(short) 3));
Thread.sleep(500L);
queue.processAll(1000L);
//THEN
Assert.assertEquals(0, trackReplicationEvents.size());
Assert.assertEquals(0, copyEvents.size());
} finally {
if (leaseManager != null) {
leaseManager.shutdown();
}
}
}
@Test
public void testOverReplication() throws ContainerNotFoundException,
InterruptedException {
try {
leaseManager.start();
replicationManager.start();
final ContainerID containerID = ContainerID.valueof(5L);
final ContainerReplica duplicateReplicaOne = ContainerReplica.newBuilder()
.setContainerID(containerID)
.setContainerState(ContainerReplicaProto.State.CLOSED)
.setSequenceId(10000L)
.setOriginNodeId(listOfDatanodeDetails.get(0).getUuid())
.setDatanodeDetails(listOfDatanodeDetails.get(3))
.build();
final ContainerReplica duplicateReplicaTwo = ContainerReplica.newBuilder()
.setContainerID(containerID)
.setContainerState(ContainerReplicaProto.State.CLOSED)
.setSequenceId(10000L)
.setOriginNodeId(listOfDatanodeDetails.get(1).getUuid())
.setDatanodeDetails(listOfDatanodeDetails.get(4))
.build();
when(containerManager.getContainerReplicas(new ContainerID(5L)))
.thenReturn(new HashSet<>(Arrays.asList(
listOfContainerReplica.get(0),
listOfContainerReplica.get(1),
listOfContainerReplica.get(2),
duplicateReplicaOne,
duplicateReplicaTwo
)));
queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
new ReplicationRequest(5L, (short) 5, System.currentTimeMillis(),
(short) 3));
Thread.sleep(500L);
queue.processAll(1000L);
//THEN
Assert.assertEquals(2, trackDeleteEvents.size());
Assert.assertEquals(2, copyEvents.size());
} finally {
if (leaseManager != null) {
leaseManager.shutdown();
}
}
}
@Test
public void testEventSending() throws InterruptedException, IOException {
//GIVEN
try {
leaseManager.start();
replicationManager.start();
//WHEN
queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
new ReplicationRequest(1L, (short) 2, System.currentTimeMillis(),
(short) 3));
Thread.sleep(500L);
queue.processAll(1000L);
//THEN
Assert.assertEquals(1, trackReplicationEvents.size());
Assert.assertEquals(1, copyEvents.size());
} finally {
if (leaseManager != null) {
leaseManager.shutdown();
}
}
}
@Test
public void testCommandWatcher() throws InterruptedException, IOException {
LeaseManager<Long> rapidLeaseManager =
new LeaseManager<>("Test", 1000L);
replicationManager = new ReplicationManager(containerPlacementPolicy,
containerManager, queue, rapidLeaseManager);
try {
leaseManager.start();
rapidLeaseManager.start();
replicationManager.start();
queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
new ReplicationRequest(1L, (short) 2, System.currentTimeMillis(),
(short) 3));
Thread.sleep(500L);
queue.processAll(1000L);
Assert.assertEquals(1, trackReplicationEvents.size());
Assert.assertEquals(1, copyEvents.size());
Assert.assertEquals(trackReplicationEvents.get(0).getId(),
copyEvents.get(0).getCommand().getId());
//event is timed out
Thread.sleep(1500);
queue.processAll(1000L);
//original copy command + retry
Assert.assertEquals(2, trackReplicationEvents.size());
Assert.assertEquals(2, copyEvents.size());
} finally {
rapidLeaseManager.shutdown();
if (leaseManager != null) {
leaseManager.shutdown();
}
}
}
}

View File

@ -1,134 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container.replication;
import java.util.Random;
import java.util.UUID;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Test class for ReplicationQueue.
*/
public class TestReplicationQueue {
private ReplicationQueue replicationQueue;
private Random random;
@Before
public void setUp() {
replicationQueue = new ReplicationQueue();
random = new Random();
}
@Test
public void testDuplicateAddOp() throws InterruptedException {
long contId = random.nextLong();
String nodeId = UUID.randomUUID().toString();
ReplicationRequest obj1, obj2, obj3;
long time = Time.monotonicNow();
obj1 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
obj2 = new ReplicationRequest(contId, (short) 2, time + 1, (short) 3);
obj3 = new ReplicationRequest(contId, (short) 1, time+2, (short) 3);
replicationQueue.add(obj1);
replicationQueue.add(obj2);
replicationQueue.add(obj3);
Assert.assertEquals("Should add only 1 msg as second one is duplicate",
1, replicationQueue.size());
ReplicationRequest temp = replicationQueue.take();
Assert.assertEquals(temp, obj3);
}
@Test
public void testPollOp() throws InterruptedException {
long contId = random.nextLong();
String nodeId = UUID.randomUUID().toString();
ReplicationRequest msg1, msg2, msg3, msg4, msg5;
msg1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
(short) 3);
long time = Time.monotonicNow();
msg2 = new ReplicationRequest(contId + 1, (short) 4, time, (short) 3);
msg3 = new ReplicationRequest(contId + 2, (short) 0, time, (short) 3);
msg4 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
// Replication message for same container but different nodeId
msg5 = new ReplicationRequest(contId + 1, (short) 2, time, (short) 3);
replicationQueue.add(msg1);
replicationQueue.add(msg2);
replicationQueue.add(msg3);
replicationQueue.add(msg4);
replicationQueue.add(msg5);
Assert.assertEquals("Should have 3 objects",
3, replicationQueue.size());
// Since Priority queue orders messages according to replication count,
// message with lowest replication should be first
ReplicationRequest temp;
temp = replicationQueue.take();
Assert.assertEquals("Should have 2 objects",
2, replicationQueue.size());
Assert.assertEquals(temp, msg3);
temp = replicationQueue.take();
Assert.assertEquals("Should have 1 objects",
1, replicationQueue.size());
Assert.assertEquals(temp, msg5);
// Message 2 should be ordered before message 5 as both have same
// replication number but message 2 has earlier timestamp.
temp = replicationQueue.take();
Assert.assertEquals("Should have 0 objects",
replicationQueue.size(), 0);
Assert.assertEquals(temp, msg4);
}
@Test
public void testRemoveOp() {
long contId = random.nextLong();
String nodeId = UUID.randomUUID().toString();
ReplicationRequest obj1, obj2, obj3;
obj1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
(short) 3);
obj2 = new ReplicationRequest(contId + 1, (short) 2, Time.monotonicNow(),
(short) 3);
obj3 = new ReplicationRequest(contId + 2, (short) 3, Time.monotonicNow(),
(short) 3);
replicationQueue.add(obj1);
replicationQueue.add(obj2);
replicationQueue.add(obj3);
Assert.assertEquals("Should have 3 objects",
3, replicationQueue.size());
replicationQueue.remove(obj3);
Assert.assertEquals("Should have 2 objects",
2, replicationQueue.size());
replicationQueue.remove(obj2);
Assert.assertEquals("Should have 1 objects",
1, replicationQueue.size());
replicationQueue.remove(obj1);
Assert.assertEquals("Should have 0 objects",
0, replicationQueue.size());
}
}

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
@ -62,7 +63,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.event.Level;
/**
* Test DeadNodeHandler.
@ -95,7 +95,8 @@ public class TestDeadNodeHandler {
manager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
containerManager = scm.getContainerManager();
deadNodeHandler = new DeadNodeHandler(nodeManager, containerManager);
deadNodeHandler = new DeadNodeHandler(nodeManager,
Mockito.mock(PipelineManager.class), containerManager);
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
publisher = Mockito.mock(EventPublisher.class);
nodeReportHandler = new NodeReportHandler(nodeManager);
@ -168,10 +169,6 @@ public class TestDeadNodeHandler {
TestUtils.closeContainer(containerManager, container2.containerID());
TestUtils.quasiCloseContainer(containerManager, container3.containerID());
GenericTestUtils.setLogLevel(DeadNodeHandler.getLogger(), Level.DEBUG);
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(DeadNodeHandler.getLogger());
deadNodeHandler.onMessage(datanode1, publisher);
Set<ContainerReplica> container1Replicas = containerManager
@ -191,60 +188,6 @@ public class TestDeadNodeHandler {
Assert.assertEquals(1, container3Replicas.size());
Assert.assertEquals(datanode3,
container3Replicas.iterator().next().getDatanodeDetails());
// Replicate should be fired for container 1 and container 2 as now
// datanode 1 is dead, these 2 will not match with expected replica count
// and their state is one of CLOSED/QUASI_CLOSE.
Assert.assertTrue(logCapturer.getOutput().contains(
"Replicate Request fired for container " +
container1.getContainerID()));
Assert.assertTrue(logCapturer.getOutput().contains(
"Replicate Request fired for container " +
container2.getContainerID()));
// as container4 is still in open state, replicate event should not have
// fired for this.
Assert.assertFalse(logCapturer.getOutput().contains(
"Replicate Request fired for container " +
container4.getContainerID()));
}
@Test
public void testOnMessageReplicaFailure() throws Exception {
DatanodeDetails datanode1 = TestUtils.randomDatanodeDetails();
DatanodeDetails datanode2 = TestUtils.randomDatanodeDetails();
DatanodeDetails datanode3 = TestUtils.randomDatanodeDetails();
String storagePath = GenericTestUtils.getRandomizedTempPath()
.concat("/" + datanode1.getUuidString());
StorageReportProto storageOne = TestUtils.createStorageReport(
datanode1.getUuid(), storagePath, 100, 10, 90, null);
nodeManager.register(datanode1,
TestUtils.createNodeReport(storageOne), null);
nodeManager.register(datanode2,
TestUtils.createNodeReport(storageOne), null);
nodeManager.register(datanode3,
TestUtils.createNodeReport(storageOne), null);
DatanodeDetails dn1 = TestUtils.randomDatanodeDetails();
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(DeadNodeHandler.getLogger());
nodeReportHandler.onMessage(getNodeReport(dn1, storageOne),
Mockito.mock(EventPublisher.class));
ContainerInfo container1 =
TestUtils.allocateContainer(containerManager);
TestUtils.closeContainer(containerManager, container1.containerID());
deadNodeHandler.onMessage(dn1, eventQueue);
Assert.assertTrue(logCapturer.getOutput().contains(
"DeadNode event for a unregistered node"));
}
private void registerReplicas(ContainerManager contManager,

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.NodeReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@ -69,7 +70,8 @@ public class TestStatisticsUpdate {
final StorageContainerManager scm = HddsTestUtils.getScm(conf);
nodeManager = scm.getScmNodeManager();
final DeadNodeHandler deadNodeHandler = new DeadNodeHandler(
nodeManager, scm.getContainerManager());
nodeManager, Mockito.mock(PipelineManager.class),
scm.getContainerManager());
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
nodeReportHandler = new NodeReportHandler(nodeManager);
}

View File

@ -421,15 +421,10 @@ public class TestEndPoint {
serverAddress, 3000);
Map<Long, CommandStatus> map = stateContext.getCommandStatusMap();
assertNotNull(map);
assertEquals("Should have 2 objects", 2, map.size());
assertTrue(map.containsKey(Long.valueOf(2)));
assertTrue(map.containsKey(Long.valueOf(3)));
assertTrue(map.get(Long.valueOf(2)).getType()
.equals(Type.replicateContainerCommand));
assertTrue(
map.get(Long.valueOf(3)).getType().equals(Type.deleteBlocksCommand));
assertTrue(map.get(Long.valueOf(2)).getStatus().equals(Status.PENDING));
assertTrue(map.get(Long.valueOf(3)).getStatus().equals(Status.PENDING));
assertEquals("Should have 1 objects", 1, map.size());
assertTrue(map.containsKey(3L));
assertEquals(Type.deleteBlocksCommand, map.get(3L).getType());
assertEquals(Status.PENDING, map.get(3L).getStatus());
scmServerImpl.clearScmCommandRequests();
}