diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index dc4e673126b..b073d7b81d8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -32,6 +32,8 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler .CommandDispatcher; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler .DeleteBlocksCommandHandler; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler + .ReplicateContainerCommandHandler; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; @@ -95,6 +97,7 @@ public class DatanodeStateMachine implements Closeable { .addHandler(new CloseContainerCommandHandler()) .addHandler(new DeleteBlocksCommandHandler( container.getContainerManager(), conf)) + .addHandler(new ReplicateContainerCommandHandler()) .setConnectionManager(connectionManager) .setContainer(container) .setContext(context) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java new file mode 100644 index 00000000000..b4e83b7d40c --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; +import org.apache.hadoop.ozone.container.common.statemachine + .SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Command handler to copy containers from sources. + */ +public class ReplicateContainerCommandHandler implements CommandHandler { + static final Logger LOG = + LoggerFactory.getLogger(ReplicateContainerCommandHandler.class); + + private int invocationCount; + + private long totalTime; + + @Override + public void handle(SCMCommand command, OzoneContainer container, + StateContext context, SCMConnectionManager connectionManager) { + LOG.warn("Replicate command is not yet handled"); + + } + + @Override + public SCMCommandProto.Type getCommandType() { + return Type.replicateContainerCommand; + } + + @Override + public int getInvocationCount() { + return this.invocationCount; + } + + @Override + public long getAverageRunTime() { + if (invocationCount > 0) { + return totalTime / invocationCount; + } + return 0; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 1ee6375a562..260a245ceb3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -39,6 +39,8 @@ import org.apache.hadoop.ozone.container.common.statemachine import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -196,6 +198,16 @@ public class HeartbeatEndpointTask } this.context.addCommand(closeContainer); break; + case replicateContainerCommand: + ReplicateContainerCommand replicateContainerCommand = + ReplicateContainerCommand.getFromProtobuf( + commandResponseProto.getReplicateContainerCommandProto()); + if (LOG.isDebugEnabled()) { + LOG.debug("Received SCM container replicate request for container {}", + replicateContainerCommand.getContainerID()); + } + this.context.addCommand(replicateContainerCommand); + break; default: throw new IllegalArgumentException("Unknown response : " + commandResponseProto.getCommandType().name()); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java new file mode 100644 index 00000000000..834318b1451 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto + .Builder; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; +import org.apache.hadoop.hdds.scm.container.ContainerID; + +import com.google.common.base.Preconditions; + +/** + * SCM command to request replication of a container. + */ +public class ReplicateContainerCommand + extends SCMCommand { + + private final long containerID; + + private final List sourceDatanodes; + + public ReplicateContainerCommand(long containerID, + List sourceDatanodes) { + this.containerID = containerID; + this.sourceDatanodes = sourceDatanodes; + } + + @Override + public Type getType() { + return SCMCommandProto.Type.replicateContainerCommand; + } + + @Override + public byte[] getProtoBufMessage() { + return getProto().toByteArray(); + } + + public ReplicateContainerCommandProto getProto() { + Builder builder = ReplicateContainerCommandProto.newBuilder() + .setContainerID(containerID); + for (DatanodeDetails dd : sourceDatanodes) { + builder.addSources(dd.getProtoBufMessage()); + } + return builder.build(); + } + + public static ReplicateContainerCommand getFromProtobuf( + ReplicateContainerCommandProto protoMessage) { + Preconditions.checkNotNull(protoMessage); + + List datanodeDetails = + protoMessage.getSourcesList() + .stream() + .map(DatanodeDetails::getFromProtoBuf) + .collect(Collectors.toList()); + + return new ReplicateContainerCommand(protoMessage.getContainerID(), + datanodeDetails); + + } + + public long getContainerID() { + return containerID; + } + + public List getSourceDatanodes() { + return sourceDatanodes; + } +} diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index f6aba05636c..54230c1e9fc 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -172,6 +172,7 @@ message SCMCommandProto { deleteBlocksCommand = 2; closeContainerCommand = 3; deleteContainerCommand = 4; + replicateContainerCommand = 5; } // TODO: once we start using protoc 3.x, refactor this message using "oneof" required Type commandType = 1; @@ -179,6 +180,7 @@ message SCMCommandProto { optional DeleteBlocksCommandProto deleteBlocksCommandProto = 3; optional CloseContainerCommandProto closeContainerCommandProto = 4; optional DeleteContainerCommandProto deleteContainerCommandProto = 5; + optional ReplicateContainerCommandProto replicateContainerCommandProto = 6; } /** @@ -227,12 +229,20 @@ message CloseContainerCommandProto { } /** -This command asks the datanode to close a specific container. +This command asks the datanode to delete a specific container. */ message DeleteContainerCommandProto { required int64 containerID = 1; } +/** +This command asks the datanode to replicate a container from specific sources. +*/ +message ReplicateContainerCommandProto { + required int64 containerID = 1; + repeated DatanodeDetailsProto sources = 2; +} + /** * Protocol used from a datanode to StorageContainerManager. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index 7d16161e595..eb5ce1a827f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; + import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto @@ -62,6 +63,9 @@ import static org.apache.hadoop.hdds.protocol.proto import static org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto .Type.deleteBlocksCommand; +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type + .replicateContainerCommand; import static org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto .Type.reregisterCommand; @@ -77,6 +81,7 @@ import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; import org.apache.hadoop.ozone.protocolPB @@ -293,6 +298,12 @@ public class SCMDatanodeProtocolServer implements .setCloseContainerCommandProto( ((CloseContainerCommand) cmd).getProto()) .build(); + case replicateContainerCommand: + return builder + .setCommandType(replicateContainerCommand) + .setReplicateContainerCommandProto( + ((ReplicateContainerCommand)cmd).getProto()) + .build(); default: throw new IllegalArgumentException("Not implemented"); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java new file mode 100644 index 00000000000..a5b101fa70b --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.client.rest.OzoneException; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.hadoop.test.GenericTestUtils; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_SIZE_GB; +import org.junit.Test; + +/** + * Tests the behavior of the datanode, when replicate container command is + * received. + */ +public class TestReplicateContainerHandler { + + @Test + public void test() throws IOException, TimeoutException, InterruptedException, + OzoneException { + + GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer + .captureLogs(ReplicateContainerCommandHandler.LOG); + + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(OZONE_SCM_CONTAINER_SIZE_GB, "1"); + MiniOzoneCluster cluster = + MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build(); + cluster.waitForClusterToBeReady(); + + DatanodeDetails datanodeDetails = + cluster.getHddsDatanodes().get(0).getDatanodeDetails(); + //send the order to replicate the container + cluster.getStorageContainerManager().getScmNodeManager() + .addDatanodeCommand(datanodeDetails.getUuid(), + new ReplicateContainerCommand(1L, + new ArrayList<>())); + + //TODO: here we test only the serialization/unserialization as + // the implementation is not yet done + GenericTestUtils + .waitFor(() -> logCapturer.getOutput().contains("not yet handled"), 500, + 5 * 1000); + + } + +} \ No newline at end of file