diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 6e940adbc3e..e337d2fb49c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -251,6 +251,13 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_CONTAINER_CLOSE_THRESHOLD = "ozone.scm.container.close.threshold"; public static final float OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f; + + public static final String HDDS_SCM_WATCHER_TIMEOUT = + "hdds.scm.watcher.timeout"; + + public static final String HDDS_SCM_WATCHER_TIMEOUT_DEFAULT = + "10m"; + /** * Never constructed. */ diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 027367725bf..92f0c41c014 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.scm.ScmConfigKeys; + import org.apache.ratis.util.TimeDuration; /** diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 84a3e0c230f..6ddf3c6b855 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1108,4 +1108,14 @@ + + hdds.scm.watcher.timeout + 10m + OZONE, SCM, MANAGEMENT + + Timeout for the watchers of the HDDS SCM CommandWatchers. After this + duration the Copy/Delete container commands will be sent again to the + datanode unless the datanode confirms the completion. + + \ No newline at end of file diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java index 473c152b1d4..38386d45df9 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java @@ -180,9 +180,9 @@ public abstract class EventWatcher getTimeoutEvents( Predicate predicate) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java index c2159ad1557..62e24194fd5 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java @@ -48,4 +48,9 @@ public class TypedEvent implements Event { return name; } + @Override + public String toString() { + return "TypedEvent{" + "payloadType=" + payloadType + ", name='" + name + + '\'' + '}'; + } } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java index 8f18478da94..786b7b8a5ac 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java @@ -216,12 +216,12 @@ public class TestEventWatcher { } @Override - void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) { + protected void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) { publisher.fireEvent(UNDER_REPLICATED, payload); } @Override - void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) { + protected void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) { //Good job. We did it. } @@ -231,8 +231,6 @@ public class TestEventWatcher { } } - ; - private static class ReplicationCompletedEvent implements IdentifiableEventPayload { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java index 5d91ac5dad1..3336c8e80e7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java @@ -31,11 +31,14 @@ public interface ContainerPlacementPolicy { /** * Given the replication factor and size required, return set of datanodes * that satisfy the nodes and size requirement. + * + * @param excludedNodes - list of nodes to be excluded. * @param nodesRequired - number of datanodes required. * @param sizeRequired - size required for the container or block. * @return list of datanodes chosen. * @throws IOException */ - List chooseDatanodes(int nodesRequired, long sizeRequired) + List chooseDatanodes(List excludedNodes, + int nodesRequired, long sizeRequired) throws IOException; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java index 0a595d55ba5..ba241dcabcf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java @@ -95,16 +95,20 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy { * 3. if a set of containers are requested, we either meet the required * number of nodes or we fail that request. * + * + * @param excludedNodes - datanodes with existing replicas * @param nodesRequired - number of datanodes required. * @param sizeRequired - size required for the container or block. * @return list of datanodes chosen. * @throws SCMException SCM exception. */ - public List chooseDatanodes(int nodesRequired, final long - sizeRequired) throws SCMException { + public List chooseDatanodes( + List excludedNodes, + int nodesRequired, final long sizeRequired) throws SCMException { List healthyNodes = nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); + healthyNodes.removeAll(excludedNodes); String msg; if (healthyNodes.size() == 0) { msg = "No healthy node found to allocate container."; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java index 85a6b544cce..8df8f6e034d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java @@ -17,17 +17,18 @@ package org.apache.hadoop.hdds.scm.container.placement.algorithms; -import com.google.common.annotations.VisibleForTesting; +import java.util.List; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; + +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - /** * Container placement policy that randomly choose datanodes with remaining * space to satisfy the size constraints. @@ -83,6 +84,8 @@ public final class SCMContainerPlacementCapacity extends SCMCommonPolicy { /** * Called by SCM to choose datanodes. * + * + * @param excludedNodes - list of the datanodes to exclude. * @param nodesRequired - number of datanodes required. * @param sizeRequired - size required for the container or block. * @return List of datanodes. @@ -90,9 +93,10 @@ public final class SCMContainerPlacementCapacity extends SCMCommonPolicy { */ @Override public List chooseDatanodes( - final int nodesRequired, final long sizeRequired) throws SCMException { + List excludedNodes, final int nodesRequired, + final long sizeRequired) throws SCMException { List healthyNodes = - super.chooseDatanodes(nodesRequired, sizeRequired); + super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired); if (healthyNodes.size() == nodesRequired) { return healthyNodes; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java index 9903c84e317..76702d555ef 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java @@ -56,6 +56,8 @@ public final class SCMContainerPlacementRandom extends SCMCommonPolicy /** * Choose datanodes called by the SCM to choose the datanode. * + * + * @param excludedNodes - list of the datanodes to exclude. * @param nodesRequired - number of datanodes required. * @param sizeRequired - size required for the container or block. * @return List of Datanodes. @@ -63,9 +65,10 @@ public final class SCMContainerPlacementRandom extends SCMCommonPolicy */ @Override public List chooseDatanodes( - final int nodesRequired, final long sizeRequired) throws SCMException { + List excludedNodes, final int nodesRequired, + final long sizeRequired) throws SCMException { List healthyNodes = - super.chooseDatanodes(nodesRequired, sizeRequired); + super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired); if (healthyNodes.size() == nodesRequired) { return healthyNodes; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java new file mode 100644 index 00000000000..03a81a7db86 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager + .ReplicationCompleted; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager + .ReplicationRequestToRepeat; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.server.events.Event; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.server.events.EventWatcher; +import org.apache.hadoop.ozone.lease.LeaseManager; + +/** + * Command watcher to track the replication commands. + */ +public class ReplicationCommandWatcher + extends + EventWatcher { + + public ReplicationCommandWatcher(Event startEvent, + Event completionEvent, + LeaseManager leaseManager) { + super(startEvent, completionEvent, leaseManager); + } + + @Override + protected void onTimeout(EventPublisher publisher, + ReplicationRequestToRepeat payload) { + //put back to the original queue + publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER, + payload.getRequest()); + } + + @Override + protected void onFinished(EventPublisher publisher, + ReplicationRequestToRepeat payload) { + + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java new file mode 100644 index 00000000000..5f787224bea --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ThreadFactory; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerStateManager; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload; +import org.apache.hadoop.ozone.lease.LeaseManager; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import static org.apache.hadoop.hdds.scm.events.SCMEvents + .TRACK_REPLICATE_COMMAND; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Replication Manager manages the replication of the closed container. + */ +public class ReplicationManager implements Runnable { + + private static final Logger LOG = + LoggerFactory.getLogger(ReplicationManager.class); + + private ReplicationQueue replicationQueue; + + private ContainerPlacementPolicy containerPlacement; + + private EventPublisher eventPublisher; + + private ReplicationCommandWatcher replicationCommandWatcher; + + private boolean running = true; + + private ContainerStateManager containerStateManager; + + public ReplicationManager(ContainerPlacementPolicy containerPlacement, + ContainerStateManager containerStateManager, EventQueue eventQueue, + LeaseManager commandWatcherLeaseManager) { + + this.containerPlacement = containerPlacement; + this.containerStateManager = containerStateManager; + this.eventPublisher = eventQueue; + + this.replicationCommandWatcher = + new ReplicationCommandWatcher(TRACK_REPLICATE_COMMAND, + SCMEvents.REPLICATION_COMPLETE, commandWatcherLeaseManager); + + this.replicationQueue = new ReplicationQueue(); + + eventQueue.addHandler(SCMEvents.REPLICATE_CONTAINER, + (replicationRequest, publisher) -> replicationQueue + .add(replicationRequest)); + + this.replicationCommandWatcher.start(eventQueue); + + } + + public void start() { + + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Replication Manager").build(); + + threadFactory.newThread(this).start(); + } + + public void run() { + + while (running) { + ReplicationRequest request = null; + try { + //TODO: add throttling here + request = replicationQueue.take(); + + ContainerID containerID = new ContainerID(request.getContainerId()); + ContainerInfo containerInfo = + containerStateManager.getContainer(containerID); + + Preconditions.checkNotNull(containerInfo, + "No information about the container " + request.getContainerId()); + + Preconditions + .checkState(containerInfo.getState() == LifeCycleState.CLOSED, + "Container should be in closed state"); + + //check the current replication + List datanodesWithReplicas = + getCurrentReplicas(request); + + ReplicationRequest finalRequest = request; + + int inFlightReplications = replicationCommandWatcher.getTimeoutEvents( + e -> e.request.getContainerId() == finalRequest.getContainerId()) + .size(); + + int deficit = + request.getExpecReplicationCount() - datanodesWithReplicas.size() + - inFlightReplications; + + if (deficit > 0) { + + List selectedDatanodes = containerPlacement + .chooseDatanodes(datanodesWithReplicas, deficit, + containerInfo.getUsedBytes()); + + //send the command + for (DatanodeDetails datanode : selectedDatanodes) { + + ReplicateContainerCommand replicateCommand = + new ReplicateContainerCommand(containerID.getId(), + datanodesWithReplicas); + + eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, + new CommandForDatanode<>( + datanode.getUuid(), replicateCommand)); + + ReplicationRequestToRepeat timeoutEvent = + new ReplicationRequestToRepeat(replicateCommand.getId(), + request); + + eventPublisher.fireEvent(TRACK_REPLICATE_COMMAND, timeoutEvent); + + } + + } else if (deficit < 0) { + //TODO: too many replicas. Not handled yet. + } + + } catch (Exception e) { + LOG.error("Can't replicate container {}", request, e); + } + } + + } + + @VisibleForTesting + protected List getCurrentReplicas(ReplicationRequest request) + throws IOException { + //TODO: replication information is not yet available after HDDS-175, + // should be fixed after HDDS-228 + return new ArrayList<>(); + } + + @VisibleForTesting + public ReplicationQueue getReplicationQueue() { + return replicationQueue; + } + + public void stop() { + running = false; + } + + /** + * Event for the ReplicationCommandWatcher to repeate the embedded request + * in case fof timeout. + */ + public static class ReplicationRequestToRepeat + implements IdentifiableEventPayload { + + private final long commandId; + + private final ReplicationRequest request; + + public ReplicationRequestToRepeat(long commandId, + ReplicationRequest request) { + this.commandId = commandId; + this.request = request; + } + + public ReplicationRequest getRequest() { + return request; + } + + @Override + public long getId() { + return commandId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ReplicationRequestToRepeat that = (ReplicationRequestToRepeat) o; + return Objects.equals(request, that.request); + } + + @Override + public int hashCode() { + + return Objects.hash(request); + } + } + + public static class ReplicationCompleted implements IdentifiableEventPayload { + + private final long uuid; + + public ReplicationCompleted(long uuid) { + this.uuid = uuid; + } + + @Override + public long getId() { + return uuid; + } + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java similarity index 68% rename from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java index e0a235122e8..4ca67be4e11 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java @@ -15,11 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.ozone.container.replication; +package org.apache.hadoop.hdds.scm.container.replication; import java.util.List; -import java.util.PriorityQueue; -import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; /** * Priority queue to handle under-replicated and over replicated containers @@ -28,13 +28,13 @@ import java.util.Queue; */ public class ReplicationQueue { - private final Queue queue; + private final BlockingQueue queue; - ReplicationQueue() { - queue = new PriorityQueue<>(); + public ReplicationQueue() { + queue = new PriorityBlockingQueue<>(); } - public synchronized boolean add(ReplicationRequest repObj) { + public boolean add(ReplicationRequest repObj) { if (this.queue.contains(repObj)) { // Remove the earlier message and insert this one this.queue.remove(repObj); @@ -42,7 +42,7 @@ public class ReplicationQueue { return this.queue.add(repObj); } - public synchronized boolean remove(ReplicationRequest repObj) { + public boolean remove(ReplicationRequest repObj) { return queue.remove(repObj); } @@ -52,21 +52,18 @@ public class ReplicationQueue { * * @return the head of this queue, or {@code null} if this queue is empty */ - public synchronized ReplicationRequest peek() { + public ReplicationRequest peek() { return queue.peek(); } /** - * Retrieves and removes the head of this queue, - * or returns {@code null} if this queue is empty. - * - * @return the head of this queue, or {@code null} if this queue is empty + * Retrieves and removes the head of this queue (blocking queue). */ - public synchronized ReplicationRequest poll() { - return queue.poll(); + public ReplicationRequest take() throws InterruptedException { + return queue.take(); } - public synchronized boolean removeAll(List repObjs) { + public boolean removeAll(List repObjs) { return queue.removeAll(repObjs); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java similarity index 98% rename from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java index a6ccce13e0b..ef7c546641e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java @@ -15,9 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.ozone.container.replication; +package org.apache.hadoop.hdds.scm.container.replication; import java.io.Serializable; + import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java similarity index 87% rename from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java index 7f335e37c12..934b01e6231 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java @@ -16,8 +16,8 @@ * limitations under the License. */ -package org.apache.hadoop.ozone.container.replication; +package org.apache.hadoop.hdds.scm.container.replication; /** - * Ozone Container replicaton related classes. + * HDDS (Closed) Container replicaton related classes. */ \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 46f1588b045..ad1702bcc4f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -28,6 +28,10 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .ContainerReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .NodeReportFromDatanode; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager + .ReplicationCompleted; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.server.events.Event; import org.apache.hadoop.hdds.server.events.TypedEvent; @@ -128,6 +132,33 @@ public final class SCMEvents { new TypedEvent(DeleteBlockCommandStatus.class, "DeleteBlockCommandStatus"); + /** + * This is the command for ReplicationManager to handle under/over + * replication. Sent by the ContainerReportHandler after processing the + * heartbeat. + */ + public static final TypedEvent REPLICATE_CONTAINER = + new TypedEvent<>(ReplicationRequest.class); + + /** + * This event is sent by the ReplicaManager to the + * ReplicationCommandWatcher to track the in-progress replication. + */ + public static final TypedEvent + TRACK_REPLICATE_COMMAND = + new TypedEvent<>(ReplicationManager.ReplicationRequestToRepeat.class); + /** + * This event comes from the Heartbeat dispatcher (in fact from the + * datanode) to notify the scm that the replication is done. This is + * received by the replicate command watcher to mark in-progress task as + * finished. +

+ * TODO: Temporary event, should be replaced by specific Heartbeat + * ActionRequred event. + */ + public static final TypedEvent REPLICATION_COMPLETE = + new TypedEvent<>(ReplicationCompleted.class); + /** * Private Ctor. Never Constructed. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index aba64101741..f4cd448f160 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; @@ -38,7 +39,12 @@ import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; import org.apache.hadoop.hdds.scm.container.ContainerMapping; import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .SCMContainerPlacementCapacity; import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics; import org.apache.hadoop.hdds.scm.events.SCMEvents; @@ -61,9 +67,13 @@ import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.Storage.StorageState; import org.apache.hadoop.ozone.common.StorageInfo; +import org.apache.hadoop.ozone.lease.LeaseManager; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.StringUtils; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .HDDS_SCM_WATCHER_TIMEOUT_DEFAULT; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,6 +163,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl * Key = DatanodeUuid, value = ContainerStat. */ private Cache containerReportCache; + private final ReplicationManager replicationManager; + private final LeaseManager commandWatcherLeaseManager; /** * Creates a new StorageContainerManager. Configuration will be updated @@ -207,6 +219,20 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler); eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler); + long watcherTimeout = + conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, + HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); + + commandWatcherLeaseManager = new LeaseManager<>(watcherTimeout); + + //TODO: support configurable containerPlacement policy + ContainerPlacementPolicy containerPlacementPolicy = + new SCMContainerPlacementCapacity(scmNodeManager, conf); + + replicationManager = new ReplicationManager(containerPlacementPolicy, + scmContainerManager.getStateManager(), eventQueue, + commandWatcherLeaseManager); + scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys .OZONE_ADMINISTRATORS); scmUsername = UserGroupInformation.getCurrentUser().getUserName(); @@ -552,7 +578,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl httpServer.start(); scmBlockManager.start(); - + replicationManager.start(); setStartTime(); } @@ -561,6 +587,20 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl */ public void stop() { + try { + LOG.info("Stopping Replication Manager Service."); + replicationManager.stop(); + } catch (Exception ex) { + LOG.error("Replication manager service stop failed.", ex); + } + + try { + LOG.info("Stopping Lease Manager of the command watchers"); + commandWatcherLeaseManager.shutdown(); + } catch (Exception ex) { + LOG.error("Lease Manager of the command watchers stop failed"); + } + try { LOG.info("Stopping datanode service RPC server"); getDatanodeProtocolServer().stop(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java new file mode 100644 index 00000000000..5966f2a6c1a --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.placement.algorithms; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; + +import org.junit.Assert; +import org.junit.Test; +import static org.mockito.Matchers.anyObject; +import org.mockito.Mockito; +import static org.mockito.Mockito.when; + +public class TestSCMContainerPlacementCapacity { + @Test + public void chooseDatanodes() throws SCMException { + //given + Configuration conf = new OzoneConfiguration(); + + List datanodes = new ArrayList<>(); + for (int i = 0; i < 7; i++) { + datanodes.add(TestUtils.getDatanodeDetails()); + } + + NodeManager mockNodeManager = Mockito.mock(NodeManager.class); + when(mockNodeManager.getNodes(NodeState.HEALTHY)) + .thenReturn(new ArrayList<>(datanodes)); + + when(mockNodeManager.getNodeStat(anyObject())) + .thenReturn(new SCMNodeMetric(100l, 0L, 100L)); + when(mockNodeManager.getNodeStat(datanodes.get(2))) + .thenReturn(new SCMNodeMetric(100l, 90L, 10L)); + when(mockNodeManager.getNodeStat(datanodes.get(3))) + .thenReturn(new SCMNodeMetric(100l, 80L, 20L)); + when(mockNodeManager.getNodeStat(datanodes.get(4))) + .thenReturn(new SCMNodeMetric(100l, 70L, 30L)); + + SCMContainerPlacementCapacity scmContainerPlacementRandom = + new SCMContainerPlacementCapacity(mockNodeManager, conf); + + List existingNodes = new ArrayList<>(); + existingNodes.add(datanodes.get(0)); + existingNodes.add(datanodes.get(1)); + + Map selectedCount = new HashMap<>(); + for (DatanodeDetails datanode : datanodes) { + selectedCount.put(datanode, 0); + } + + for (int i = 0; i < 1000; i++) { + + //when + List datanodeDetails = + scmContainerPlacementRandom.chooseDatanodes(existingNodes, 1, 15); + + //then + Assert.assertEquals(1, datanodeDetails.size()); + DatanodeDetails datanode0Details = datanodeDetails.get(0); + + Assert.assertNotEquals( + "Datanode 0 should not been selected: excluded by parameter", + datanodes.get(0), datanode0Details); + Assert.assertNotEquals( + "Datanode 1 should not been selected: excluded by parameter", + datanodes.get(1), datanode0Details); + Assert.assertNotEquals( + "Datanode 2 should not been selected: not enough space there", + datanodes.get(2), datanode0Details); + + selectedCount + .put(datanode0Details, selectedCount.get(datanode0Details) + 1); + + } + + //datanode 4 has less space. Should be selected less times. + Assert.assertTrue(selectedCount.get(datanodes.get(3)) > selectedCount + .get(datanodes.get(6))); + Assert.assertTrue(selectedCount.get(datanodes.get(4)) > selectedCount + .get(datanodes.get(6))); + } +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java new file mode 100644 index 00000000000..430c181205f --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.placement.algorithms; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; + +import org.junit.Assert; +import org.junit.Test; +import static org.mockito.Matchers.anyObject; +import org.mockito.Mockito; +import static org.mockito.Mockito.when; + +public class TestSCMContainerPlacementRandom { + + @Test + public void chooseDatanodes() throws SCMException { + //given + Configuration conf = new OzoneConfiguration(); + + List datanodes = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + datanodes.add(TestUtils.getDatanodeDetails()); + } + + NodeManager mockNodeManager = Mockito.mock(NodeManager.class); + when(mockNodeManager.getNodes(NodeState.HEALTHY)) + .thenReturn(new ArrayList<>(datanodes)); + + when(mockNodeManager.getNodeStat(anyObject())) + .thenReturn(new SCMNodeMetric(100l, 0l, 100l)); + when(mockNodeManager.getNodeStat(datanodes.get(2))) + .thenReturn(new SCMNodeMetric(100l, 90l, 10l)); + + SCMContainerPlacementRandom scmContainerPlacementRandom = + new SCMContainerPlacementRandom(mockNodeManager, conf); + + List existingNodes = new ArrayList<>(); + existingNodes.add(datanodes.get(0)); + existingNodes.add(datanodes.get(1)); + + for (int i = 0; i < 100; i++) { + //when + List datanodeDetails = + scmContainerPlacementRandom.chooseDatanodes(existingNodes, 1, 15); + + //then + Assert.assertEquals(1, datanodeDetails.size()); + DatanodeDetails datanode0Details = datanodeDetails.get(0); + + Assert.assertNotEquals( + "Datanode 0 should not been selected: excluded by parameter", + datanodes.get(0), datanode0Details); + Assert.assertNotEquals( + "Datanode 1 should not been selected: excluded by parameter", + datanodes.get(1), datanode0Details); + Assert.assertNotEquals( + "Datanode 2 should not been selected: not enough space there", + datanodes.get(2), datanode0Details); + + } + } +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java new file mode 100644 index 00000000000..e3e876b5e6b --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.UUID; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.ContainerStateManager; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager + .ReplicationRequestToRepeat; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.ozone.lease.LeaseManager; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; + +import com.google.common.base.Preconditions; +import static org.apache.hadoop.hdds.scm.events.SCMEvents + .TRACK_REPLICATE_COMMAND; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Matchers.anyObject; +import org.mockito.Mockito; +import static org.mockito.Mockito.when; + +/** + * Test behaviour of the TestReplication. + */ +public class TestReplicationManager { + + private EventQueue queue; + + private List trackReplicationEvents; + + private List> copyEvents; + + private ContainerStateManager containerStateManager; + + private ContainerPlacementPolicy containerPlacementPolicy; + private List listOfDatanodeDetails; + + @Before + public void initReplicationManager() throws IOException { + + listOfDatanodeDetails = TestUtils.getListOfDatanodeDetails(5); + + containerPlacementPolicy = + (excludedNodes, nodesRequired, sizeRequired) -> listOfDatanodeDetails + .subList(2, 2 + nodesRequired); + + containerStateManager = Mockito.mock(ContainerStateManager.class); + + //container with 2 replicas + ContainerInfo containerInfo = new ContainerInfo.Builder() + .setState(LifeCycleState.CLOSED) + .build(); + + when(containerStateManager.getContainer(anyObject())) + .thenReturn(containerInfo); + + queue = new EventQueue(); + + trackReplicationEvents = new ArrayList<>(); + queue.addHandler(TRACK_REPLICATE_COMMAND, + (event, publisher) -> trackReplicationEvents.add(event)); + + copyEvents = new ArrayList<>(); + queue.addHandler(SCMEvents.DATANODE_COMMAND, + (event, publisher) -> copyEvents.add(event)); + + } + + @Test + public void testEventSending() throws InterruptedException, IOException { + + + //GIVEN + + LeaseManager leaseManager = new LeaseManager<>(100000L); + try { + leaseManager.start(); + + ReplicationManager replicationManager = + new ReplicationManager(containerPlacementPolicy, + containerStateManager, + queue, leaseManager) { + @Override + protected List getCurrentReplicas( + ReplicationRequest request) throws IOException { + return listOfDatanodeDetails.subList(0, 2); + } + }; + replicationManager.start(); + + //WHEN + + queue.fireEvent(SCMEvents.REPLICATE_CONTAINER, + new ReplicationRequest(1l, (short) 2, System.currentTimeMillis(), + (short) 3)); + + Thread.sleep(500L); + queue.processAll(1000L); + + //THEN + + Assert.assertEquals(1, trackReplicationEvents.size()); + Assert.assertEquals(1, copyEvents.size()); + } finally { + if (leaseManager != null) { + leaseManager.shutdown(); + } + } + } + + @Test + public void testCommandWatcher() throws InterruptedException, IOException { + + Logger.getRootLogger().setLevel(Level.DEBUG); + LeaseManager leaseManager = new LeaseManager<>(1000L); + + try { + leaseManager.start(); + + ReplicationManager replicationManager = + new ReplicationManager(containerPlacementPolicy, containerStateManager, + + + queue, leaseManager) { + @Override + protected List getCurrentReplicas( + ReplicationRequest request) throws IOException { + return listOfDatanodeDetails.subList(0, 2); + } + }; + replicationManager.start(); + + queue.fireEvent(SCMEvents.REPLICATE_CONTAINER, + new ReplicationRequest(1l, (short) 2, System.currentTimeMillis(), + (short) 3)); + + Thread.sleep(500L); + + queue.processAll(1000L); + + Assert.assertEquals(1, trackReplicationEvents.size()); + Assert.assertEquals(1, copyEvents.size()); + + Assert.assertEquals(trackReplicationEvents.get(0).getId(), + copyEvents.get(0).getCommand().getId()); + + //event is timed out + Thread.sleep(1500); + + queue.processAll(1000L); + + //original copy command + retry + Assert.assertEquals(2, trackReplicationEvents.size()); + Assert.assertEquals(2, copyEvents.size()); + + } finally { + if (leaseManager != null) { + leaseManager.shutdown(); + } + } + } + + public static Pipeline createPipeline(Iterable ids) + throws IOException { + Objects.requireNonNull(ids, "ids == null"); + final Iterator i = ids.iterator(); + Preconditions.checkArgument(i.hasNext()); + final DatanodeDetails leader = i.next(); + String pipelineName = "TEST-" + UUID.randomUUID().toString().substring(3); + final Pipeline pipeline = + new Pipeline(leader.getUuidString(), LifeCycleState.OPEN, + ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName); + pipeline.addMember(leader); + for (; i.hasNext(); ) { + pipeline.addMember(i.next()); + } + return pipeline; + } + +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java similarity index 92% rename from hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java rename to hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java index 6d74c683eeb..a593718f573 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.ozone.container.replication; +package org.apache.hadoop.hdds.scm.container.replication; import java.util.Random; import java.util.UUID; @@ -39,7 +39,7 @@ public class TestReplicationQueue { } @Test - public void testDuplicateAddOp() { + public void testDuplicateAddOp() throws InterruptedException { long contId = random.nextLong(); String nodeId = UUID.randomUUID().toString(); ReplicationRequest obj1, obj2, obj3; @@ -53,12 +53,12 @@ public class TestReplicationQueue { replicationQueue.add(obj3); Assert.assertEquals("Should add only 1 msg as second one is duplicate", 1, replicationQueue.size()); - ReplicationRequest temp = replicationQueue.poll(); + ReplicationRequest temp = replicationQueue.take(); Assert.assertEquals(temp, obj3); } @Test - public void testPollOp() { + public void testPollOp() throws InterruptedException { long contId = random.nextLong(); String nodeId = UUID.randomUUID().toString(); ReplicationRequest msg1, msg2, msg3, msg4, msg5; @@ -82,19 +82,19 @@ public class TestReplicationQueue { // Since Priority queue orders messages according to replication count, // message with lowest replication should be first ReplicationRequest temp; - temp = replicationQueue.poll(); + temp = replicationQueue.take(); Assert.assertEquals("Should have 2 objects", 2, replicationQueue.size()); Assert.assertEquals(temp, msg3); - temp = replicationQueue.poll(); + temp = replicationQueue.take(); Assert.assertEquals("Should have 1 objects", 1, replicationQueue.size()); Assert.assertEquals(temp, msg5); // Message 2 should be ordered before message 5 as both have same replication // number but message 2 has earlier timestamp. - temp = replicationQueue.poll(); + temp = replicationQueue.take(); Assert.assertEquals("Should have 0 objects", replicationQueue.size(), 0); Assert.assertEquals(temp, msg4); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java similarity index 93% rename from hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java rename to hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java index 5b1fd0f43a9..1423c999381 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java @@ -19,5 +19,5 @@ /** * SCM Testing and Mocking Utils. */ -package org.apache.hadoop.ozone.container.replication; +package org.apache.hadoop.hdds.scm.container.replication; // Test classes for Replication functionality. \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java index 651b77618e4..802f2ef05aa 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java @@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -86,11 +87,11 @@ public class TestContainerPlacement { for (int x = 0; x < opsCount; x++) { long containerSize = random.nextInt(100) * OzoneConsts.GB; List nodesCapacity = - capacityPlacer.chooseDatanodes(nodesRequired, containerSize); + capacityPlacer.chooseDatanodes(new ArrayList<>(), nodesRequired, containerSize); assertEquals(nodesRequired, nodesCapacity.size()); List nodesRandom = - randomPlacer.chooseDatanodes(nodesRequired, containerSize); + randomPlacer.chooseDatanodes(nodesCapacity, nodesRequired, containerSize); // One fifth of all calls are delete if (x % 5 == 0) {