HDDS-192:Create new SCMCommand to request a replication of a container. Contributed by Elek Marton

This commit is contained in:
Bharat Viswanadham 2018-06-25 21:12:05 -07:00
parent 35ec9401e8
commit 238fe00ad2
7 changed files with 269 additions and 1 deletions

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.CommandDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.DeleteBlocksCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.ReplicateContainerCommandHandler;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
@ -95,6 +97,7 @@ public class DatanodeStateMachine implements Closeable {
.addHandler(new CloseContainerCommandHandler())
.addHandler(new DeleteBlocksCommandHandler(
container.getContainerManager(), conf))
.addHandler(new ReplicateContainerCommandHandler())
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Command handler to copy containers from sources.
*/
public class ReplicateContainerCommandHandler implements CommandHandler {
static final Logger LOG =
LoggerFactory.getLogger(ReplicateContainerCommandHandler.class);
private int invocationCount;
private long totalTime;
@Override
public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
LOG.warn("Replicate command is not yet handled");
}
@Override
public SCMCommandProto.Type getCommandType() {
return Type.replicateContainerCommand;
}
@Override
public int getInvocationCount() {
return this.invocationCount;
}
@Override
public long getAverageRunTime() {
if (invocationCount > 0) {
return totalTime / invocationCount;
}
return 0;
}
}

View File

@ -39,6 +39,8 @@ import org.apache.hadoop.ozone.container.common.statemachine
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -196,6 +198,16 @@ public class HeartbeatEndpointTask
}
this.context.addCommand(closeContainer);
break;
case replicateContainerCommand:
ReplicateContainerCommand replicateContainerCommand =
ReplicateContainerCommand.getFromProtobuf(
commandResponseProto.getReplicateContainerCommandProto());
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM container replicate request for container {}",
replicateContainerCommand.getContainerID());
}
this.context.addCommand(replicateContainerCommand);
break;
default:
throw new IllegalArgumentException("Unknown response : "
+ commandResponseProto.getCommandType().name());

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol.commands;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto
.Builder;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import com.google.common.base.Preconditions;
/**
* SCM command to request replication of a container.
*/
public class ReplicateContainerCommand
extends SCMCommand<ReplicateContainerCommandProto> {
private final long containerID;
private final List<DatanodeDetails> sourceDatanodes;
public ReplicateContainerCommand(long containerID,
List<DatanodeDetails> sourceDatanodes) {
this.containerID = containerID;
this.sourceDatanodes = sourceDatanodes;
}
@Override
public Type getType() {
return SCMCommandProto.Type.replicateContainerCommand;
}
@Override
public byte[] getProtoBufMessage() {
return getProto().toByteArray();
}
public ReplicateContainerCommandProto getProto() {
Builder builder = ReplicateContainerCommandProto.newBuilder()
.setContainerID(containerID);
for (DatanodeDetails dd : sourceDatanodes) {
builder.addSources(dd.getProtoBufMessage());
}
return builder.build();
}
public static ReplicateContainerCommand getFromProtobuf(
ReplicateContainerCommandProto protoMessage) {
Preconditions.checkNotNull(protoMessage);
List<DatanodeDetails> datanodeDetails =
protoMessage.getSourcesList()
.stream()
.map(DatanodeDetails::getFromProtoBuf)
.collect(Collectors.toList());
return new ReplicateContainerCommand(protoMessage.getContainerID(),
datanodeDetails);
}
public long getContainerID() {
return containerID;
}
public List<DatanodeDetails> getSourceDatanodes() {
return sourceDatanodes;
}
}

View File

@ -172,6 +172,7 @@ message SCMCommandProto {
deleteBlocksCommand = 2;
closeContainerCommand = 3;
deleteContainerCommand = 4;
replicateContainerCommand = 5;
}
// TODO: once we start using protoc 3.x, refactor this message using "oneof"
required Type commandType = 1;
@ -179,6 +180,7 @@ message SCMCommandProto {
optional DeleteBlocksCommandProto deleteBlocksCommandProto = 3;
optional CloseContainerCommandProto closeContainerCommandProto = 4;
optional DeleteContainerCommandProto deleteContainerCommandProto = 5;
optional ReplicateContainerCommandProto replicateContainerCommandProto = 6;
}
/**
@ -227,12 +229,20 @@ message CloseContainerCommandProto {
}
/**
This command asks the datanode to close a specific container.
This command asks the datanode to delete a specific container.
*/
message DeleteContainerCommandProto {
required int64 containerID = 1;
}
/**
This command asks the datanode to replicate a container from specific sources.
*/
message ReplicateContainerCommandProto {
required int64 containerID = 1;
repeated DatanodeDetailsProto sources = 2;
}
/**
* Protocol used from a datanode to StorageContainerManager.
*

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
@ -62,6 +63,9 @@ import static org.apache.hadoop.hdds.protocol.proto
import static org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto
.Type.deleteBlocksCommand;
import static org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
.replicateContainerCommand;
import static org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto
.Type.reregisterCommand;
@ -77,6 +81,7 @@ import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
import org.apache.hadoop.ozone.protocolPB
@ -293,6 +298,12 @@ public class SCMDatanodeProtocolServer implements
.setCloseContainerCommandProto(
((CloseContainerCommand) cmd).getProto())
.build();
case replicateContainerCommand:
return builder
.setCommandType(replicateContainerCommand)
.setReplicateContainerCommandProto(
((ReplicateContainerCommand)cmd).getProto())
.build();
default:
throw new IllegalArgumentException("Not implemented");
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_CONTAINER_SIZE_GB;
import org.junit.Test;
/**
* Tests the behavior of the datanode, when replicate container command is
* received.
*/
public class TestReplicateContainerHandler {
@Test
public void test() throws IOException, TimeoutException, InterruptedException,
OzoneException {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(ReplicateContainerCommandHandler.LOG);
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(OZONE_SCM_CONTAINER_SIZE_GB, "1");
MiniOzoneCluster cluster =
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
cluster.waitForClusterToBeReady();
DatanodeDetails datanodeDetails =
cluster.getHddsDatanodes().get(0).getDatanodeDetails();
//send the order to replicate the container
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(datanodeDetails.getUuid(),
new ReplicateContainerCommand(1L,
new ArrayList<>()));
//TODO: here we test only the serialization/unserialization as
// the implementation is not yet done
GenericTestUtils
.waitFor(() -> logCapturer.getOutput().contains("not yet handled"), 500,
5 * 1000);
}
}