HDFS-12799. Ozone: SCM: Close containers: extend SCMCommandResponseProto with SCMCloseContainerCmdResponseProto. Contributed by Elek, Marton.

This commit is contained in:
Chen Liang 2017-12-19 11:30:18 -08:00 committed by Owen O'Malley
parent 966853894f
commit caeeb78ca4
7 changed files with 334 additions and 6 deletions

View File

@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.CloseContainerHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CommandDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ContainerReportHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler;
@ -82,6 +84,7 @@ public class DatanodeStateMachine implements Closeable {
// trick.
commandDispatcher = CommandDispatcher.newBuilder()
.addHandler(new ContainerReportHandler())
.addHandler(new CloseContainerHandler())
.addHandler(new DeleteBlocksCommandHandler(
container.getContainerManager(), conf))
.setConnectionManager(connectionManager)

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.Type;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Container Report handler.
*/
public class CloseContainerHandler implements CommandHandler {
static final Logger LOG =
LoggerFactory.getLogger(CloseContainerHandler.class);
private int invocationCount;
private long totalTime;
/**
* Constructs a ContainerReport handler.
*/
public CloseContainerHandler() {
}
/**
* Handles a given SCM command.
*
* @param command - SCM Command
* @param container - Ozone Container.
* @param context - Current Context.
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
LOG.debug("Processing Close Container command.");
invocationCount++;
long startTime = Time.monotonicNow();
String containerName = "UNKNOWN";
try {
SCMCloseContainerCmdResponseProto
closeContainerProto =
SCMCloseContainerCmdResponseProto
.parseFrom(command.getProtoBufMessage());
containerName = closeContainerProto.getContainerName();
container.getContainerManager().closeContainer(containerName);
} catch (Exception e) {
LOG.error("Can't close container " + containerName, e);
} finally {
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
}
}
/**
* Returns the command type that this command handler handles.
*
* @return Type
*/
@Override
public Type getCommandType() {
return Type.closeContainerCommand;
}
/**
* Returns number of times this handler has been invoked.
*
* @return int
*/
@Override
public int getInvocationCount() {
return invocationCount;
}
/**
* Returns the average time this function takes to run.
*
* @return long
*/
@Override
public long getAverageRunTime() {
if (invocationCount > 0) {
return totalTime / invocationCount;
}
return 0;
}
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.container.common.statemachine
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine.EndPointStates;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
@ -166,6 +167,16 @@ public class HeartbeatEndpointTask
this.context.addCommand(db);
}
break;
case closeContainerCommand:
CloseContainerCommand closeContainer =
CloseContainerCommand.getFromProtobuf(
commandResponseProto.getCloseContainerProto());
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM container close request for container {}",
closeContainer.getContainerName());
}
this.context.addCommand(closeContainer);
break;
default:
throw new IllegalArgumentException("Unknown response : "
+ commandResponseProto.getCmdType().name());

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol.commands;
import com.google.common.base.Preconditions;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.Type;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.Type.closeContainerCommand;
/**
* Asks datanode to close a container.
*/
public class CloseContainerCommand
extends SCMCommand<SCMCloseContainerCmdResponseProto> {
private String containerName;
public CloseContainerCommand(String containerName) {
this.containerName = containerName;
}
/**
* Returns the type of this command.
*
* @return Type
*/
@Override
public Type getType() {
return closeContainerCommand;
}
/**
* Gets the protobuf message of this object.
*
* @return A protobuf message.
*/
@Override
public byte[] getProtoBufMessage() {
return getProto().toByteArray();
}
public SCMCloseContainerCmdResponseProto getProto() {
return SCMCloseContainerCmdResponseProto.newBuilder()
.setContainerName(containerName).build();
}
public static CloseContainerCommand getFromProtobuf(
SCMCloseContainerCmdResponseProto closeContainerProto) {
Preconditions.checkNotNull(closeContainerProto);
return new CloseContainerCommand(closeContainerProto.getContainerName());
}
public String getContainerName() {
return containerName;
}
}

View File

@ -26,6 +26,7 @@ import com.google.common.cache.RemovalNotification;
import com.google.protobuf.BlockingService;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.io.IOUtils;
@ -34,13 +35,15 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.jmx.ServiceRuntimeInfoImpl;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import org.apache.hadoop.ozone.common.Storage.StorageState;
import org.apache.hadoop.ozone.common.StorageInfo;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@ -88,7 +91,6 @@ import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.ozone.common.Storage.StorageState;
import org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
@ -102,6 +104,7 @@ import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@ -112,10 +115,8 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.Collections;
import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
import static org.apache.hadoop.ozone.protocol.proto
.ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
import static org.apache.hadoop.scm.ScmConfigKeys
@ -580,6 +581,10 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
return builder.setCmdType(Type.deleteBlocksCommand)
.setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto())
.build();
case closeContainerCommand:
return builder.setCmdType(Type.closeContainerCommand)
.setCloseContainerProto(((CloseContainerCommand)cmd).getProto())
.build();
default:
throw new IllegalArgumentException("Not implemented");
}

View File

@ -210,6 +210,13 @@ This command tells the data node to send in the container report when possible
message SendContainerReportProto {
}
/**
This command asks the datanode to close a specific container.
*/
message SCMCloseContainerCmdResponseProto {
required string containerName = 1;
}
/**
Type of commands supported by SCM to datanode protocol.
*/
@ -219,6 +226,7 @@ enum Type {
sendContainerReport = 4;
reregisterCommand = 5;
deleteBlocksCommand = 6;
closeContainerCommand = 7;
}
/*
@ -232,6 +240,7 @@ message SCMCommandResponseProto {
optional SCMReregisterCmdResponseProto reregisterProto = 6;
optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 7;
required string datanodeUUID = 8;
optional SCMCloseContainerCmdResponseProto closeContainerProto = 9;
}

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.ReplicationFactor;
import org.apache.hadoop.ozone.client.ReplicationType;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Test to behaviour of the datanode when recieve close container command.
*/
public class TestCloseContainerHandler {
@Test
public void test() throws IOException, TimeoutException, InterruptedException,
OzoneException {
//setup a cluster (1G free space is enough for a unit test)
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(OZONE_SCM_CONTAINER_SIZE_GB, "1");
MiniOzoneClassicCluster cluster =
new MiniOzoneClassicCluster.Builder(conf).numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
cluster.waitOzoneReady();
//the easiest way to create an open container is creating a key
OzoneClientFactory.setConfiguration(conf);
OzoneClient client = OzoneClientFactory.getClient();
ObjectStore objectStore = client.getObjectStore();
objectStore.createVolume("test");
objectStore.getVolume("test").createBucket("test");
OzoneOutputStream key = objectStore.getVolume("test").getBucket("test")
.createKey("test", 1024, ReplicationType.STAND_ALONE,
ReplicationFactor.ONE);
key.write("test".getBytes());
key.close();
//get the name of a valid container
KsmKeyArgs keyArgs =
new KsmKeyArgs.Builder().setVolumeName("test").setBucketName("test")
.setType(OzoneProtos.ReplicationType.STAND_ALONE)
.setFactor(OzoneProtos.ReplicationFactor.ONE).setDataSize(1024)
.setKeyName("test").build();
KsmKeyLocationInfo ksmKeyLocationInfo =
cluster.getKeySpaceManager().lookupKey(keyArgs).getKeyLocationVersions()
.get(0).getBlocksLatestVersionOnly().get(0);
String containerName = ksmKeyLocationInfo.getContainerName();
Assert.assertFalse(isContainerClosed(cluster, containerName));
//send the order to close the container
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(cluster.getDataNodes().get(0).getDatanodeId(),
new CloseContainerCommand(containerName));
GenericTestUtils.waitFor(() -> isContainerClosed(cluster, containerName),
500,
5 * 1000);
//double check if it's really closed (waitFor also throws an exception)
Assert.assertTrue(isContainerClosed(cluster, containerName));
}
private Boolean isContainerClosed(MiniOzoneClassicCluster cluster,
String containerName) {
ContainerData containerData;
try {
containerData = cluster.getDataNodes().get(0).getOzoneContainerManager()
.getContainerManager().readContainer(containerName);
return !containerData.isOpen();
} catch (StorageContainerException e) {
throw new AssertionError(e);
}
}
}