diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 2cc12f4cfc5..ca81324f3f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto; import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.container.common.helpers.Pipeline; @@ -37,8 +38,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; -import java.util.HashSet; -import java.util.Set; /** * Maintaining a list of ChunkInputStream. Write based on offset. @@ -60,10 +59,6 @@ public class ChunkGroupOutputStream extends OutputStream { private long totalSize; private long byteOffset; - //This has to be removed once HDFS-11888 is resolved. - //local cache which will have list of created container names. - private static Set containersCreated = new HashSet<>(); - public ChunkGroupOutputStream() { this.streamEntries = new ArrayList<>(); this.currentStreamIndex = 0; @@ -293,27 +288,21 @@ public class ChunkGroupOutputStream extends OutputStream { XceiverClientSpi xceiverClient = xceiverClientManager.acquireClient(pipeline); // create container if needed - // TODO : should be subKeyInfo.getShouldCreateContainer(), but for now - //The following change has to reverted once HDFS-11888 is fixed. - if(!containersCreated.contains(containerName)) { - synchronized (containerName.intern()) { - //checking again, there is a chance that some other thread has - // created it. - if (!containersCreated.contains(containerName)) { - LOG.debug("Need to create container {}.", containerName); - try { - ContainerProtocolCalls.createContainer(xceiverClient, requestId); - } catch (StorageContainerException ex) { - if (ex.getResult().equals(Result.CONTAINER_EXISTS)) { - //container already exist. - LOG.debug("Container {} already exists.", containerName); - } else { - LOG.error("Container creation failed for {}.", - containerName, ex); - throw ex; - } - } - containersCreated.add(containerName); + if (subKeyInfo.getShouldCreateContainer()) { + try { + // Block manager sets the container creation stage begin. + ContainerProtocolCalls.createContainer(xceiverClient, requestId); + storageContainerLocationClient.notifyObjectCreationStage( + NotifyObjectCreationStageRequestProto.Type.container, + containerName, + NotifyObjectCreationStageRequestProto.Stage.complete); + } catch (StorageContainerException ex) { + if (ex.getResult().equals(Result.CONTAINER_EXISTS)) { + //container already exist, this should never happen + LOG.debug("Container {} already exists.", containerName); + } else { + LOG.error("Container creation failed for {}.", containerName, ex); + throw ex; } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java index 73d81f811d3..0b081a11722 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java @@ -187,7 +187,7 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE = "ozone.scm.container.provision_batch_size"; - public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 1; + public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 5; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java index a90cff47873..3c1a2669005 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java @@ -20,6 +20,7 @@ package org.apache.hadoop.scm.client; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadContainerResponseProto; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto; import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.scm.XceiverClientManager; @@ -86,12 +87,20 @@ public class ContainerOperationClient implements ScmClient { client = xceiverClientManager.acquireClient(pipeline); String traceID = UUID.randomUUID().toString(); + storageContainerLocationClient.notifyObjectCreationStage( + NotifyObjectCreationStageRequestProto.Type.container, + containerId, + NotifyObjectCreationStageRequestProto.Stage.begin); ContainerProtocolCalls.createContainer(client, traceID); if (LOG.isDebugEnabled()) { LOG.debug("Created container " + containerId + " leader:" + pipeline.getLeader() + " machines:" + pipeline.getMachines()); } + storageContainerLocationClient.notifyObjectCreationStage( + NotifyObjectCreationStageRequestProto.Type.container, + containerId, + NotifyObjectCreationStageRequestProto.Stage.complete); return pipeline; } finally { if (client != null) { @@ -116,11 +125,21 @@ public class ContainerOperationClient implements ScmClient { // connect to pipeline leader and allocate container on leader datanode. client = xceiverClientManager.acquireClient(pipeline); String traceID = UUID.randomUUID().toString(); + storageContainerLocationClient.notifyObjectCreationStage( + NotifyObjectCreationStageRequestProto.Type.container, + containerId, + NotifyObjectCreationStageRequestProto.Stage.begin); + ContainerProtocolCalls.createContainer(client, traceID); LOG.info("Created container " + containerId + " leader:" + pipeline.getLeader() + " machines:" + pipeline.getMachines() + " replication factor:" + factor); + + storageContainerLocationClient.notifyObjectCreationStage( + NotifyObjectCreationStageRequestProto.Type.container, + containerId, + NotifyObjectCreationStageRequestProto.Stage.complete); return pipeline; } finally { if (client != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java new file mode 100644 index 00000000000..73c88147e41 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.scm.container.common.helpers; + +/** + * Class wraps container + allocated info for containers managed by block svc. + */ +public class BlockContainerInfo extends ContainerInfo{ + private long allocated; + + public BlockContainerInfo(ContainerInfo container, long used) { + super(container); + this.allocated = used; + } + + public long addAllocated(long size) { + allocated += size; + return allocated; + } + + public long subtractAllocated(long size) { + allocated -= size; + return allocated; + } + + public long getAllocated() { + return this.allocated; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/ContainerInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/ContainerInfo.java new file mode 100644 index 00000000000..16bc0db177e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/ContainerInfo.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.scm.container.common.helpers; + +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import org.apache.hadoop.util.Time; + +/** + * Class wraps ozone container info. + */ +public class ContainerInfo { + private OzoneProtos.LifeCycleState state; + private Pipeline pipeline; + // The wall-clock ms since the epoch at which the current state enters. + private long stateEnterTime; + + ContainerInfo(OzoneProtos.LifeCycleState state, Pipeline pipeline, + long stateEnterTime) { + this.pipeline = pipeline; + this.state = state; + this.stateEnterTime = stateEnterTime; + } + + public ContainerInfo(ContainerInfo container) { + this.pipeline = container.getPipeline(); + this.state = container.getState(); + this.stateEnterTime = container.getStateEnterTime(); + } + + /** + * Update the current container state and state enter time to now. + * @param state + */ + public void setState(OzoneProtos.LifeCycleState state) { + this.state = state; + this.stateEnterTime = Time.monotonicNow(); + } + + public OzoneProtos.LifeCycleState getState() { + return state; + } + + public long getStateEnterTime() { + return stateEnterTime; + } + + public Pipeline getPipeline() { + return pipeline; + } + + public OzoneProtos.SCMContainerInfo getProtobuf() { + OzoneProtos.SCMContainerInfo.Builder builder = + OzoneProtos.SCMContainerInfo.newBuilder(); + builder.setPipeline(getPipeline().getProtobufMessage()); + builder.setState(state); + builder.setStateEnterTime(stateEnterTime); + return builder.build(); + } + + public static ContainerInfo fromProtobuf( + OzoneProtos.SCMContainerInfo info) { + ContainerInfo.Builder builder = new ContainerInfo.Builder(); + builder.setPipeline(Pipeline.getFromProtoBuf(info.getPipeline())); + builder.setState(info.getState()); + builder.setStateEnterTime(info.getStateEnterTime()); + return builder.build(); + } + + public static class Builder { + private OzoneProtos.LifeCycleState state; + private Pipeline pipeline; + private long stateEnterTime; + + public Builder setState(OzoneProtos.LifeCycleState state) { + this.state = state; + return this; + } + + public Builder setPipeline(Pipeline pipeline) { + this.pipeline = pipeline; + return this; + } + + public Builder setStateEnterTime(long stateEnterTime) { + this.stateEnterTime = stateEnterTime; + return this; + } + + public ContainerInfo build() { + return new ContainerInfo(state, pipeline, stateEnterTime); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java index ea0893eef3f..94134d63403 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.EnumSet; import java.util.List; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; @@ -35,8 +36,8 @@ public interface StorageContainerLocationProtocol { * */ Pipeline allocateContainer(OzoneProtos.ReplicationType replicationType, - OzoneProtos.ReplicationFactor factor, - String containerName) throws IOException; + OzoneProtos.ReplicationFactor factor, String containerName) + throws IOException; /** * Ask SCM the location of the container. SCM responds with a group of @@ -85,6 +86,18 @@ public interface StorageContainerLocationProtocol { OzoneProtos.NodePool queryNode(EnumSet nodeStatuses, OzoneProtos.QueryScope queryScope, String poolName) throws IOException; + /** + * Notify from client when begin or finish creating objects like pipeline + * or containers on datanodes. + * Container will be in Operational state after that. + * @param type object type + * @param name object name + * @param stage creation stage + */ + void notifyObjectCreationStage( + NotifyObjectCreationStageRequestProto.Type type, String name, + NotifyObjectCreationStageRequestProto.Stage stage) throws IOException; + /** * Creates a replication pipeline of a specified type. * @param type - replication type @@ -95,5 +108,4 @@ public interface StorageContainerLocationProtocol { Pipeline createReplicationPipeline(OzoneProtos.ReplicationType type, OzoneProtos.ReplicationFactor factor, OzoneProtos.NodePool nodePool) throws IOException; - } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 93cd0cf00b5..8dc1c6cb633 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -35,9 +35,9 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolPr import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto; - import org.apache.hadoop.scm.container.common.helpers.Pipeline; import java.io.Closeable; @@ -207,6 +207,32 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB } + /** + * Notify from client that creates object on datanodes. + * @param type object type + * @param name object name + * @param stage object creation stage : begin/complete + */ + @Override + public void notifyObjectCreationStage( + NotifyObjectCreationStageRequestProto.Type type, + String name, + NotifyObjectCreationStageRequestProto.Stage stage) throws IOException { + Preconditions.checkState(!Strings.isNullOrEmpty(name), + "Object name cannot be null or empty"); + NotifyObjectCreationStageRequestProto request = + NotifyObjectCreationStageRequestProto.newBuilder() + .setType(type) + .setName(name) + .setStage(stage) + .build(); + try { + rpcProxy.notifyObjectCreationStage(NULL_RPC_CONTROLLER, request); + } catch(ServiceException e){ + throw ProtobufHelper.getRemoteException(e); + } + } + /** * Creates a replication pipeline of a specified type. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto index 50926c2cea8..36c3736d05d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto @@ -35,7 +35,7 @@ message Pipeline { required string leaderID = 1; repeated DatanodeIDProto members = 2; required string containerName = 3; - optional LifeCycleStates state = 4 [default = OPERATIONAL]; + optional LifeCycleState state = 4 [default = OPEN]; } message KeyValue { @@ -72,6 +72,45 @@ message NodePool { repeated Node nodes = 1; } +/** + * LifeCycleState for SCM object creation state machine: + * ->Allocated: allocated on SCM but clean has not started creating it yet. + * ->Creating: allocated and assigned to client to create but not ack-ed yet. + * ->Open: allocated on SCM and created on datanodes and ack-ed by a client. + * ->Close: container closed due to space all used or error? + * ->Timeout -> container failed to create on datanodes or ack-ed by client. + * ->Deleting(TBD) -> container will be deleted after timeout + * 1. ALLOCATE-ed containers on SCM can't serve key/block related operation + * until ACK-ed explicitly which changes the state to OPEN. + * 2. Only OPEN/CLOSED containers can serve key/block related operation. + * 3. ALLOCATE-ed containers that are not ACK-ed timely will be TIMEOUT and + * CLEANUP asynchronously. + */ + +enum LifeCycleState { + ALLOCATED = 1; + CREATING = 2; // Used for container allocated/created by different client. + OPEN =3; // Mostly an update to SCM via HB or client call. + CLOSED = 4; // !!State after this has not been used yet. + DELETING = 5; + DELETED = 6; // object is deleted. +} + +enum LifeCycleEvent { + BEGIN_CREATE = 1; // A request to client to create this object + COMPLETE_CREATE = 2; + CLOSE = 3; // !!Event after this has not been used yet. + UPDATE = 4; + TIMEOUT = 5; // creation has timed out from SCM's View. + DELETE = 6; + CLEANUP = 7; +} + +message SCMContainerInfo { + required LifeCycleState state = 1; + required Pipeline pipeline = 2; + optional int64 stateEnterTime = 3; +} enum ReplicationType { RATIS = 1; @@ -79,15 +118,7 @@ enum ReplicationType { CHAINED = 3; } - enum ReplicationFactor { ONE = 1; THREE = 3; -} - -enum LifeCycleStates { - CLIENT_CREATE = 1; // A request to client to create this object - OPERATIONAL = 2; // Mostly an update to SCM via HB or client call. - TIMED_OUT = 3; // creation has timed out from SCM's View. - DELETED = 4; // object is deleted. } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto index 30c716605d3..550f6a6b70d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto @@ -82,6 +82,24 @@ message DeleteContainerResponseProto { // Empty response } +message NotifyObjectCreationStageRequestProto { + enum Type { + container = 1; + pipeline = 2; + } + enum Stage { + begin = 1; + complete = 2; + } + required string name = 1; + required Type type = 2; + required Stage stage = 3; +} + +message NotifyObjectCreationStageResponseProto { + // Empty response +} + /* NodeQueryRequest sends a request to SCM asking to send a list of nodes that match the NodeState that we are requesting. @@ -160,7 +178,12 @@ service StorageContainerLocationProtocolService { */ rpc queryNode(NodeQueryRequestProto) returns (NodeQueryResponseProto); - /* + /** + * Notify from client when begin or finish creating container or pipeline on datanodes. + */ + rpc notifyObjectCreationStage(NotifyObjectCreationStageRequestProto) returns (NotifyObjectCreationStageResponseProto); + + /* * Apis that Manage Pipelines. * * Pipelines are abstractions offered by SCM and Datanode that allows users @@ -175,5 +198,4 @@ service StorageContainerLocationProtocolService { */ rpc allocatePipeline(PipelineRequestProto) returns (PipelineResponseProto); - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java index 628de42c471..f12aafbb13c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -31,7 +31,7 @@ import org.apache.hadoop.ozone.protocol.proto .StorageContainerLocationProtocolProtos; import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol; -import static org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto; @@ -39,9 +39,10 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolPr import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.DeleteContainerResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto; - import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB; @@ -159,6 +160,19 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB } } + @Override + public NotifyObjectCreationStageResponseProto notifyObjectCreationStage( + RpcController controller, NotifyObjectCreationStageRequestProto request) + throws ServiceException { + try { + impl.notifyObjectCreationStage(request.getType(), request.getName(), + request.getStage()); + return NotifyObjectCreationStageResponseProto.newBuilder().build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + @Override public PipelineResponseProto allocatePipeline( RpcController controller, PipelineRequestProto request) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index e320983f89f..fa14ad0a0ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -53,6 +53,7 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SendContainerReportProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto; import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB; import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB; @@ -65,6 +66,7 @@ import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.node.SCMNodeManager; import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol; @@ -373,7 +375,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl @Override public Pipeline getContainer(String containerName) throws IOException { checkAdminAccess(); - return scmContainerManager.getContainer(containerName); + return scmContainerManager.getContainer(containerName).getPipeline(); } /** @@ -424,6 +426,34 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl return poolBuilder.build(); } + /** + * Notify from client when begin/finish creating container/pipeline objects + * on datanodes. + * @param type + * @param name + * @param stage + */ + @Override + public void notifyObjectCreationStage( + NotifyObjectCreationStageRequestProto.Type type, String name, + NotifyObjectCreationStageRequestProto.Stage stage) throws IOException { + + if (type == NotifyObjectCreationStageRequestProto.Type.container) { + ContainerInfo info = scmContainerManager.getContainer(name); + LOG.info("Container {} current state {} new stage {}", name, + info.getState(), stage); + if (stage == NotifyObjectCreationStageRequestProto.Stage.begin) { + scmContainerManager.updateContainerState(name, + OzoneProtos.LifeCycleEvent.BEGIN_CREATE); + } else { + scmContainerManager.updateContainerState(name, + OzoneProtos.LifeCycleEvent.COMPLETE_CREATE); + } + } else if (type == NotifyObjectCreationStageRequestProto.Type.pipeline) { + // TODO: pipeline state update will be addressed in future patch. + } + } + /** * Creates a replication pipeline of a specified type. */ @@ -503,7 +533,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl * * @param containerName - Name of the container. * @param replicationFactor - replication factor. - * @return Pipeline. + * @return pipeline * @throws IOException */ @Override @@ -512,7 +542,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl throws IOException { checkAdminAccess(); return scmContainerManager.allocateContainer(replicationType, - replicationFactor, containerName); + replicationFactor, containerName).getPipeline(); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java index e000ccc9d0c..57305896f82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.scm.block; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.metrics2.util.MBeans; @@ -29,6 +30,8 @@ import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.utils.BatchOperation; @@ -83,8 +86,11 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { private final long containerSize; private final long cacheSize; - private final MetadataStore openContainerStore; - private Map openContainers; + // Track all containers owned by block service. + private final MetadataStore containerStore; + + private Map> containers; private final int containerProvisionBatchSize; private final Random rand; private final ObjectName mxBean; @@ -121,14 +127,13 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { // Load store of all open contains for block allocation File openContainsDbPath = new File(scmMetaDataDir, OPEN_CONTAINERS_DB); - openContainerStore = MetadataStoreBuilder.newBuilder() + containerStore = MetadataStoreBuilder.newBuilder() .setConf(conf) .setDbFile(openContainsDbPath) .setCacheSize(this.cacheSize * OzoneConsts.MB) .build(); - openContainers = new ConcurrentHashMap<>(); - loadOpenContainers(); + loadAllocatedContainers(); this.containerProvisionBatchSize = conf.getInt( ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, @@ -141,20 +146,39 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { // TODO: close full (or almost full) containers with a separate thread. /** - * Load open containers from persistent store. + * Load allocated containers from persistent store. * @throws IOException */ - private void loadOpenContainers() throws IOException { + private void loadAllocatedContainers() throws IOException { + // Pre-allocate empty map entry by state to avoid null check + containers = new ConcurrentHashMap<>(); + for (OzoneProtos.LifeCycleState state : + OzoneProtos.LifeCycleState.values()) { + containers.put(state, new ConcurrentHashMap()); + } try { - openContainerStore.iterate(null, (key, value) -> { + containerStore.iterate(null, (key, value) -> { try { String containerName = DFSUtil.bytes2String(key); Long containerUsed = Long.parseLong(DFSUtil.bytes2String(value)); - openContainers.put(containerName, containerUsed); - LOG.debug("Loading open container: {} used : {}", containerName, - containerUsed); + ContainerInfo containerInfo = + containerManager.getContainer(containerName); + // TODO: remove the container from block manager's container DB + // Most likely the allocated container is timeout and cleaned up + // by SCM, we should clean up correspondingly instead of just skip it. + if (containerInfo == null) { + LOG.warn("Container {} allocated by block service" + + "can't be found in SCM", containerName); + return true; + } + Map containersByState = + containers.get(containerInfo.getState()); + containersByState.put(containerName, + new BlockContainerInfo(containerInfo, containerUsed)); + LOG.debug("Loading allocated container: {} used : {} state: {}", + containerName, containerUsed, containerInfo.getState()); } catch (Exception e) { - LOG.warn("Failed loading open container, continue next..."); + LOG.warn("Failed loading allocated container, continue next..."); } return true; }); @@ -166,25 +190,26 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { } /** - * Pre-provision specified count of containers for block creation. - * @param count - number of containers to create. - * @return list of container names created. + * Pre allocate specified count of containers for block creation. + * @param count - number of containers to allocate. + * @return list of container names allocated. * @throws IOException */ - private List provisionContainers(int count) throws IOException { + private List allocateContainers(int count) throws IOException { List results = new ArrayList(); lock.lock(); try { for (int i = 0; i < count; i++) { String containerName = UUID.randomUUID().toString(); + ContainerInfo containerInfo = null; try { // TODO: Fix this later when Ratis is made the Default. - Pipeline pipeline = containerManager.allocateContainer( + containerInfo = containerManager.allocateContainer( OzoneProtos.ReplicationType.STAND_ALONE, OzoneProtos.ReplicationFactor.ONE, containerName); - if (pipeline == null) { + if (containerInfo == null) { LOG.warn("Unable to allocate container."); continue; } @@ -192,8 +217,12 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { LOG.warn("Unable to allocate container: " + ex); continue; } - openContainers.put(containerName, 0L); - openContainerStore.put(DFSUtil.string2Bytes(containerName), + Map containersByState = + containers.get(OzoneProtos.LifeCycleState.ALLOCATED); + Preconditions.checkNotNull(containersByState); + containersByState.put(containerName, + new BlockContainerInfo(containerInfo, 0)); + containerStore.put(DFSUtil.string2Bytes(containerName), DFSUtil.string2Bytes(Long.toString(0L))); results.add(containerName); } @@ -203,6 +232,76 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { return results; } + /** + * Filter container by states and size. + * @param state the state of the container. + * @param size the minimal available size of the container + * @return allocated containers satisfy both state and size. + */ + private List filterContainers(OzoneProtos.LifeCycleState state, + long size) { + Map containersByState = + this.containers.get(state); + return containersByState.entrySet().parallelStream() + .filter(e -> ((e.getValue().getAllocated() + size < containerSize))) + .map(e -> e.getKey()) + .collect(Collectors.toList()); + } + + private BlockContainerInfo getContainer(OzoneProtos.LifeCycleState state, + String name) { + Map containersByState = this.containers.get(state); + return containersByState.get(name); + } + + // Relies on the caller such as allocateBlock() to hold the lock + // to ensure containers map consistent. + private void updateContainer(OzoneProtos.LifeCycleState oldState, String name, + OzoneProtos.LifeCycleState newState) { + if (LOG.isDebugEnabled()) { + LOG.debug("Update container {} from state {} to state {}", + name, oldState, newState); + } + Map containersInOldState = + this.containers.get(oldState); + BlockContainerInfo containerInfo = containersInOldState.get(name); + Preconditions.checkNotNull(containerInfo); + containersInOldState.remove(name); + Map containersInNewState = + this.containers.get(newState); + containersInNewState.put(name, containerInfo); + } + + // Refresh containers that have been allocated. + // We may not need to track all the states, just the creating/open/close + // should be enough for now. + private void refreshContainers() { + Map containersByState = + this.containers.get(OzoneProtos.LifeCycleState.ALLOCATED); + for (String containerName: containersByState.keySet()) { + try { + ContainerInfo containerInfo = + containerManager.getContainer(containerName); + if (containerInfo == null) { + // TODO: clean up containers that has been deleted on SCM but + // TODO: still in ALLOCATED state in block manager. + LOG.debug("Container {} allocated by block service" + + "can't be found in SCM", containerName); + continue; + } + if (containerInfo.getState() == OzoneProtos.LifeCycleState.OPEN) { + updateContainer(OzoneProtos.LifeCycleState.ALLOCATED, containerName, + containerInfo.getState()); + } + // TODO: check containers in other state and refresh as needed. + // TODO: ALLOCATED container that is timeout and DELETED. (unit test) + // TODO: OPEN container that is CLOSE. + } catch (IOException ex) { + LOG.debug("Failed to get container info for: {}", containerName); + } + } + } + /** * Allocates a new block for a given size. * @@ -215,8 +314,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { */ @Override public AllocatedBlock allocateBlock(final long size) throws IOException { - boolean createContainer; - Pipeline pipeline; + boolean createContainer = false; if (size < 0 || size > containerSize) { throw new SCMException("Unsupported block size", INVALID_BLOCK_SIZE); @@ -228,37 +326,29 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { lock.lock(); try { + refreshContainers(); List candidates; - if (openContainers.size() == 0) { - try { - candidates = provisionContainers(containerProvisionBatchSize); - } catch (IOException ex) { - throw new SCMException("Unable to allocate container for the block", - FAILED_TO_ALLOCATE_CONTAINER); - } - createContainer = true; - } else { - candidates = openContainers.entrySet().parallelStream() - .filter(e -> (e.getValue() + size < containerSize)) - .map(e -> e.getKey()) - .collect(Collectors.toList()); - createContainer = false; - } - + candidates = filterContainers(OzoneProtos.LifeCycleState.OPEN, size); if (candidates.size() == 0) { - try { - candidates = provisionContainers(containerProvisionBatchSize); - } catch (IOException ex) { - throw new SCMException("Unable to allocate container for the block", - FAILED_TO_ALLOCATE_CONTAINER); + candidates = filterContainers(OzoneProtos.LifeCycleState.ALLOCATED, + size); + if (candidates.size() == 0) { + try { + candidates = allocateContainers(containerProvisionBatchSize); + } catch (IOException ex) { + LOG.error("Unable to allocate container for the block."); + throw new SCMException("Unable to allocate container for the block", + FAILED_TO_ALLOCATE_CONTAINER); + } + } + // now we should have some candidates in ALLOCATE state + if (candidates.size() == 0) { + throw new SCMException("Fail to find any container to allocate block " + + "of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE); } } - if (candidates.size() == 0) { - throw new SCMException("Fail to find any container to allocate block " + - "of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE); - } - + // Candidates list now should include only ALLOCATE or OPEN containers int randomIdx = rand.nextInt(candidates.size()); String containerName = candidates.get(randomIdx); if (LOG.isDebugEnabled()) { @@ -266,28 +356,46 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { candidates.toString(), containerName); } - pipeline = containerManager.getContainer(containerName); - if (pipeline == null) { + ContainerInfo containerInfo = + containerManager.getContainer(containerName); + if (containerInfo == null) { LOG.debug("Unable to find container for the block"); throw new SCMException("Unable to find container to allocate block", FAILED_TO_FIND_CONTAINER); } + if (LOG.isDebugEnabled()) { + LOG.debug("Candidate {} state {}", containerName, + containerInfo.getState()); + } + // Container must be either OPEN or ALLOCATE state + if (containerInfo.getState() == OzoneProtos.LifeCycleState.ALLOCATED) { + createContainer = true; + } + // TODO: make block key easier to debug (e.g., seq no) // Allocate key for the block String blockKey = UUID.randomUUID().toString(); AllocatedBlock.Builder abb = new AllocatedBlock.Builder() - .setKey(blockKey).setPipeline(pipeline) + .setKey(blockKey).setPipeline(containerInfo.getPipeline()) .setShouldCreateContainer(createContainer); - if (pipeline.getMachines().size() > 0) { + if (containerInfo.getPipeline().getMachines().size() > 0) { blockStore.put(DFSUtil.string2Bytes(blockKey), DFSUtil.string2Bytes(containerName)); // update the container usage information - Long newUsed = openContainers.get(containerName) + size; - openContainers.put(containerName, newUsed); - openContainerStore.put(DFSUtil.string2Bytes(containerName), - DFSUtil.string2Bytes(Long.toString(newUsed))); + BlockContainerInfo containerInfoUpdate = + getContainer(containerInfo.getState(), containerName); + Preconditions.checkNotNull(containerInfoUpdate); + containerInfoUpdate.addAllocated(size); + containerStore.put(DFSUtil.string2Bytes(containerName), + DFSUtil.string2Bytes(Long.toString(containerInfoUpdate.getAllocated()))); + if (createContainer) { + OzoneProtos.LifeCycleState newState = + containerManager.updateContainerState(containerName, + OzoneProtos.LifeCycleEvent.BEGIN_CREATE); + updateContainer(containerInfo.getState(), containerName, newState); + } return abb.build(); } } finally { @@ -312,8 +420,16 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { throw new SCMException("Specified block key does not exist. key : " + key, FAILED_TO_FIND_BLOCK); } - return containerManager.getContainer( - DFSUtil.bytes2String(containerBytes)); + String containerName = DFSUtil.bytes2String(containerBytes); + ContainerInfo containerInfo = containerManager.getContainer( + containerName); + if (containerInfo == null) { + LOG.debug("Container {} allocated by block service" + + "can't be found in SCM", containerName); + throw new SCMException("Unable to find container for the block", + SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); + } + return containerInfo.getPipeline(); } finally { lock.unlock(); } @@ -338,8 +454,9 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { throw new SCMException("Specified block key does not exist. key : " + key, FAILED_TO_FIND_BLOCK); } + // TODO: track the block size info so that we can reclaim the container + // TODO: used space when the block is deleted. BatchOperation batch = new BatchOperation(); - containerManager.getContainer(DFSUtil.bytes2String(containerBytes)); String deletedKeyName = getDeletedKeyName(key); // Add a tombstone for the deleted key batch.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes); @@ -370,8 +487,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { if (blockStore != null) { blockStore.close(); } - if (openContainerStore != null) { - openContainerStore.close(); + if (containerStore != null) { + containerStore.close(); } MBeans.unregister(mxBean); @@ -379,6 +496,6 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { @Override public int getOpenContainersNo() { - return openContainers.size(); + return containers.get(OzoneProtos.LifeCycleState.OPEN).size(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java index 9e4053e9fea..d674b64ec37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone.scm.cli; +import com.google.common.base.Preconditions; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; @@ -33,8 +34,10 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.Buck import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo; import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo; import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Pipeline; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; +import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.utils.MetadataStore; @@ -482,10 +485,14 @@ public class SQLCLI extends Configured implements Tool { HashSet uuidChecked = new HashSet<>(); dbStore.iterate(null, (key, value) -> { String containerName = new String(key, encoding); - Pipeline pipeline = null; - pipeline = Pipeline.parseFrom(value); + ContainerInfo containerInfo = null; + containerInfo = ContainerInfo.fromProtobuf( + OzoneProtos.SCMContainerInfo.PARSER.parseFrom(value)); + Preconditions.checkNotNull(containerInfo); try { - insertContainerDB(conn, containerName, pipeline, uuidChecked); + //TODO: include container state to sqllite schema + insertContainerDB(conn, containerName, + containerInfo.getPipeline().getProtobufMessage(), uuidChecked); return true; } catch (SQLException e) { throw new IOException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java index 8daa5d41b0d..3cf78d9733d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java @@ -20,13 +20,17 @@ package org.apache.hadoop.ozone.scm.container; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; +import org.apache.hadoop.ozone.common.statemachine.StateMachine; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector; import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.util.Time; import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; import org.apache.hadoop.utils.MetadataStore; @@ -38,8 +42,10 @@ import java.io.File; import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -60,6 +66,9 @@ public class ContainerMapping implements Mapping { private final MetadataStore containerStore; private final PipelineSelector pipelineSelector; + private final StateMachine stateMachine; + /** * Constructs a mapping class that creates mapping between container names and * pipelines. @@ -88,31 +97,80 @@ public class ContainerMapping implements Mapping { .build(); this.lock = new ReentrantLock(); + this.pipelineSelector = new PipelineSelector(nodeManager, conf); + + // Initialize the container state machine. + Set finalStates = new HashSet(); + finalStates.add(OzoneProtos.LifeCycleState.OPEN); + finalStates.add(OzoneProtos.LifeCycleState.CLOSED); + finalStates.add(OzoneProtos.LifeCycleState.DELETED); + + this.stateMachine = new StateMachine<>( + OzoneProtos.LifeCycleState.ALLOCATED, finalStates); + initializeStateMachine(); } + // Client-driven Create State Machine + // States: ------------->CREATING----------------->[OPEN] + // Events: (BEGIN_CREATE) | (COMPLETE_CREATE) + // | + // |(TIMEOUT) + // V + // DELETING----------------->[DELETED] + // (CLEANUP) + // SCM Open/Close State Machine + // States: OPEN------------------>[CLOSED] + // Events: (CLOSE) + + // Delete State Machine + // States: OPEN------------------>DELETING------------------>[DELETED] + // Events: (DELETE) (CLEANUP) + private void initializeStateMachine() { + stateMachine.addTransition(OzoneProtos.LifeCycleState.ALLOCATED, + OzoneProtos.LifeCycleState.CREATING, + OzoneProtos.LifeCycleEvent.BEGIN_CREATE); + + stateMachine.addTransition(OzoneProtos.LifeCycleState.CREATING, + OzoneProtos.LifeCycleState.OPEN, + OzoneProtos.LifeCycleEvent.COMPLETE_CREATE); + + stateMachine.addTransition(OzoneProtos.LifeCycleState.OPEN, + OzoneProtos.LifeCycleState.CLOSED, + OzoneProtos.LifeCycleEvent.CLOSE); + + stateMachine.addTransition(OzoneProtos.LifeCycleState.OPEN, + OzoneProtos.LifeCycleState.DELETING, + OzoneProtos.LifeCycleEvent.DELETE); + + stateMachine.addTransition(OzoneProtos.LifeCycleState.DELETING, + OzoneProtos.LifeCycleState.DELETED, + OzoneProtos.LifeCycleEvent.CLEANUP); + + // Creating timeout -> Deleting + stateMachine.addTransition(OzoneProtos.LifeCycleState.CREATING, + OzoneProtos.LifeCycleState.DELETING, + OzoneProtos.LifeCycleEvent.TIMEOUT); + } /** - * Returns the Pipeline from the container name. - * - * @param containerName - Name - * @return - Pipeline that makes up this container. + * {@inheritDoc} */ @Override - public Pipeline getContainer(final String containerName) throws IOException { - Pipeline pipeline; + public ContainerInfo getContainer(final String containerName) throws IOException { + ContainerInfo containerInfo; lock.lock(); try { - byte[] pipelineBytes = + byte[] containerBytes = containerStore.get(containerName.getBytes(encoding)); - if (pipelineBytes == null) { + if (containerBytes == null) { throw new SCMException("Specified key does not exist. key : " + containerName, SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); } - pipeline = Pipeline.getFromProtoBuf( - OzoneProtos.Pipeline.PARSER.parseFrom(pipelineBytes)); - return pipeline; + containerInfo = ContainerInfo.fromProtobuf( + OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes)); + return containerInfo; } finally { lock.unlock(); } @@ -138,10 +196,13 @@ public class ContainerMapping implements Mapping { containerStore.getRangeKVs(startKey, count, prefixFilter); // Transform the values into the pipelines. + // TODO: return list of ContainerInfo instead of pipelines. + // TODO: filter by container state for (Map.Entry entry : range) { - Pipeline pipeline = Pipeline.getFromProtoBuf( - OzoneProtos.Pipeline.PARSER.parseFrom(entry.getValue())); - pipelineList.add(pipeline); + ContainerInfo containerInfo = ContainerInfo.fromProtobuf( + OzoneProtos.SCMContainerInfo.PARSER.parseFrom(entry.getValue())); + Preconditions.checkNotNull(containerInfo); + pipelineList.add(containerInfo.getPipeline()); } } finally { lock.unlock(); @@ -158,12 +219,12 @@ public class ContainerMapping implements Mapping { * @throws IOException - Exception */ @Override - public Pipeline allocateContainer(OzoneProtos.ReplicationType type, + public ContainerInfo allocateContainer(OzoneProtos.ReplicationType type, OzoneProtos.ReplicationFactor replicationFactor, final String containerName) throws IOException { Preconditions.checkNotNull(containerName); Preconditions.checkState(!containerName.isEmpty()); - Pipeline pipeline = null; + ContainerInfo containerInfo = null; if (!nodeManager.isOutOfNodeChillMode()) { throw new SCMException("Unable to create container while in chill mode", SCMException.ResultCodes.CHILL_MODE_EXCEPTION); @@ -171,20 +232,25 @@ public class ContainerMapping implements Mapping { lock.lock(); try { - byte[] pipelineBytes = + byte[] containerBytes = containerStore.get(containerName.getBytes(encoding)); - if (pipelineBytes != null) { + if (containerBytes != null) { throw new SCMException("Specified container already exists. key : " + containerName, SCMException.ResultCodes.CONTAINER_EXISTS); } - pipeline = pipelineSelector.getReplicationPipeline(type, + Pipeline pipeline = pipelineSelector.getReplicationPipeline(type, replicationFactor, containerName); + containerInfo = new ContainerInfo.Builder() + .setState(OzoneProtos.LifeCycleState.ALLOCATED) + .setPipeline(pipeline) + .setStateEnterTime(Time.monotonicNow()) + .build(); containerStore.put(containerName.getBytes(encoding), - pipeline.getProtobufMessage().toByteArray()); + containerInfo.getProtobuf().toByteArray()); } finally { lock.unlock(); } - return pipeline; + return containerInfo; } /** @@ -199,9 +265,9 @@ public class ContainerMapping implements Mapping { lock.lock(); try { byte[] dbKey = containerName.getBytes(encoding); - byte[] pipelineBytes = + byte[] containerBytes = containerStore.get(dbKey); - if (pipelineBytes == null) { + if(containerBytes == null) { throw new SCMException("Failed to delete container " + containerName + ", reason : container doesn't exist.", SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); @@ -212,6 +278,44 @@ public class ContainerMapping implements Mapping { } } + /** + * {@inheritDoc} + * Used by client to update container state on SCM. + */ + @Override + public OzoneProtos.LifeCycleState updateContainerState(String containerName, + OzoneProtos.LifeCycleEvent event) throws IOException { + ContainerInfo containerInfo; + lock.lock(); + try { + byte[] dbKey = containerName.getBytes(encoding); + byte[] containerBytes = + containerStore.get(dbKey); + if(containerBytes == null) { + throw new SCMException("Failed to update container state" + + containerName + ", reason : container doesn't exist.", + SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); + } + containerInfo = ContainerInfo.fromProtobuf( + OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes)); + + OzoneProtos.LifeCycleState newState; + try { + newState = stateMachine.getNextState(containerInfo.getState(), event); + } catch (InvalidStateTransitionException ex) { + throw new SCMException("Failed to update container state" + + containerName + ", reason : invalid state transition from state: " + + containerInfo.getState() + " upon event: " + event + ".", + SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE); + } + containerInfo.setState(newState); + containerStore.put(dbKey, containerInfo.getProtobuf().toByteArray()); + return newState; + } finally { + lock.unlock(); + } + } + /** * Closes this stream and releases any system resources associated with it. If * the stream is already closed then invoking this method has no effect. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java index 1ef35726d7e..7cf3f96a1c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.scm.container; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import java.io.Closeable; @@ -30,13 +31,13 @@ import java.util.List; */ public interface Mapping extends Closeable { /** - * Returns the Pipeline from the container name. + * Returns the ContainerInfo from the container name. * * @param containerName - Name - * @return - Pipeline that makes up this container. + * @return - ContainerInfo such as creation state and the pipeline. * @throws IOException */ - Pipeline getContainer(String containerName) throws IOException; + ContainerInfo getContainer(String containerName) throws IOException; /** * Returns pipelines under certain conditions. @@ -57,16 +58,15 @@ public interface Mapping extends Closeable { List listContainer(String startName, String prefixName, int count) throws IOException; - /** * Allocates a new container for a given keyName and replication factor. * * @param containerName - Name. * @param replicationFactor - replication factor of the container. - * @return - Pipeline that makes up this container. + * @return - Container Info. * @throws IOException */ - Pipeline allocateContainer(OzoneProtos.ReplicationType type, + ContainerInfo allocateContainer(OzoneProtos.ReplicationType type, OzoneProtos.ReplicationFactor replicationFactor, String containerName) throws IOException; @@ -77,4 +77,14 @@ public interface Mapping extends Closeable { * @throws IOException */ void deleteContainer(String containerName) throws IOException; + + /** + * Update container state. + * @param containerName - Container Name + * @param event - container life cycle event + * @return - new container state + * @throws IOException + */ + OzoneProtos.LifeCycleState updateContainerState(String containerName, + OzoneProtos.LifeCycleEvent event) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java index f60bdc6ced2..4e8470de0b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java @@ -106,6 +106,7 @@ public class SCMException extends IOException { CHILL_MODE_EXCEPTION, FAILED_TO_LOAD_OPEN_CONTAINER, FAILED_TO_ALLOCATE_CONTAINER, + FAILED_TO_CHANGE_CONTAINER_STATE, CONTAINER_EXISTS, FAILED_TO_FIND_CONTAINER, FAILED_TO_FIND_CONTAINER_WITH_SAPCE, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto index aa529798702..8400ee042a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -51,7 +51,7 @@ message SCMHeartbeatRequestProto { optional ReportState containerReportState = 3; } -enum ContainerState { +enum DatanodeContainerState { closed = 0; open = 1; } @@ -76,7 +76,7 @@ SCM database, This information allows SCM to startup faster and avoid having all container info in memory all the time. */ message ContainerPersistanceProto { - required ContainerState state = 1; + required DatanodeContainerState state = 1; required hadoop.hdfs.ozone.Pipeline pipeline = 2; required ContainerInfo info = 3; } @@ -89,8 +89,6 @@ message NodeContianerMapping { repeated string contianerName = 1; } - - /** A container report contains the following information. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java index 837d1b77b2f..501475bcdd6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java @@ -319,20 +319,25 @@ public class TestOzoneRpcClient { throws IOException, OzoneException { String volumeName = UUID.randomUUID().toString(); String bucketName = UUID.randomUUID().toString(); - String keyName = UUID.randomUUID().toString(); + String value = "sample value"; ozClient.createVolume(volumeName); ozClient.createBucket(volumeName, bucketName); - OzoneOutputStream out = ozClient.createKey(volumeName, bucketName, - keyName, value.getBytes().length); - out.write(value.getBytes()); - out.close(); - OzoneKey key = ozClient.getKeyDetails(volumeName, bucketName, keyName); - Assert.assertEquals(keyName, key.getKeyName()); - OzoneInputStream is = ozClient.getKey(volumeName, bucketName, keyName); - byte[] fileContent = new byte[value.getBytes().length]; - is.read(fileContent); - Assert.assertEquals(value, new String(fileContent)); + + for (int i = 0; i < 10; i++) { + String keyName = UUID.randomUUID().toString(); + + OzoneOutputStream out = ozClient.createKey(volumeName, bucketName, + keyName, value.getBytes().length); + out.write(value.getBytes()); + out.close(); + OzoneKey key = ozClient.getKeyDetails(volumeName, bucketName, keyName); + Assert.assertEquals(keyName, key.getKeyName()); + OzoneInputStream is = ozClient.getKey(volumeName, bucketName, keyName); + byte[] fileContent = new byte[value.getBytes().length]; + is.read(fileContent); + Assert.assertEquals(value, new String(fileContent)); + } } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java index cc7c9ff2341..79e6af6e28e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.test.GenericTestUtils; import org.junit.AfterClass; @@ -80,11 +81,11 @@ public class TestContainerMapping { @Test public void testallocateContainer() throws Exception { - Pipeline pipeline = mapping.allocateContainer( + ContainerInfo containerInfo = mapping.allocateContainer( xceiverClientManager.getType(), xceiverClientManager.getFactor(), UUID.randomUUID().toString()); - Assert.assertNotNull(pipeline); + Assert.assertNotNull(containerInfo); } @Test @@ -97,13 +98,15 @@ public class TestContainerMapping { */ Set pipelineList = new TreeSet<>(); for (int x = 0; x < 30; x++) { - Pipeline pipeline = mapping.allocateContainer( + ContainerInfo containerInfo = mapping.allocateContainer( xceiverClientManager.getType(), xceiverClientManager.getFactor(), UUID.randomUUID().toString()); - Assert.assertNotNull(pipeline); - pipelineList.add(pipeline.getLeader().getDatanodeUuid()); + Assert.assertNotNull(containerInfo); + Assert.assertNotNull(containerInfo.getPipeline()); + pipelineList.add(containerInfo.getPipeline().getLeader() + .getDatanodeUuid()); } Assert.assertTrue(pipelineList.size() > 5); } @@ -113,9 +116,9 @@ public class TestContainerMapping { String containerName = UUID.randomUUID().toString(); Pipeline pipeline = mapping.allocateContainer( xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerName); + xceiverClientManager.getFactor(), containerName).getPipeline(); Assert.assertNotNull(pipeline); - Pipeline newPipeline = mapping.getContainer(containerName); + Pipeline newPipeline = mapping.getContainer(containerName).getPipeline(); Assert.assertEquals(pipeline.getLeader().getDatanodeUuid(), newPipeline.getLeader().getDatanodeUuid()); } @@ -125,7 +128,7 @@ public class TestContainerMapping { String containerName = UUID.randomUUID().toString(); Pipeline pipeline = mapping.allocateContainer( xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerName); + xceiverClientManager.getFactor(), containerName).getPipeline(); Assert.assertNotNull(pipeline); thrown.expectMessage("Specified container already exists."); mapping.allocateContainer(xceiverClientManager.getType(), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java index 66e5c1efda3..430d34b6025 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java @@ -154,7 +154,7 @@ public class TestContainerPlacement { String container1 = UUID.randomUUID().toString(); Pipeline pipeline1 = containerManager.allocateContainer( xceiverClientManager.getType(), - xceiverClientManager.getFactor(), container1); + xceiverClientManager.getFactor(), container1).getPipeline(); assertEquals(xceiverClientManager.getFactor().getNumber(), pipeline1.getMachines().size());