HDFS-11888. Ozone: SCM: use state machine for open containers allocated for key/blocks. Contributed by Xiaoyu Yao.

This commit is contained in:
Xiaoyu Yao 2017-08-23 12:37:09 -07:00
parent 076bd5d64a
commit 8333602a99
20 changed files with 708 additions and 166 deletions

View File

@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@ -37,8 +38,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
/**
* Maintaining a list of ChunkInputStream. Write based on offset.
@ -60,10 +59,6 @@ public class ChunkGroupOutputStream extends OutputStream {
private long totalSize;
private long byteOffset;
//This has to be removed once HDFS-11888 is resolved.
//local cache which will have list of created container names.
private static Set<String> containersCreated = new HashSet<>();
public ChunkGroupOutputStream() {
this.streamEntries = new ArrayList<>();
this.currentStreamIndex = 0;
@ -293,27 +288,21 @@ public class ChunkGroupOutputStream extends OutputStream {
XceiverClientSpi xceiverClient =
xceiverClientManager.acquireClient(pipeline);
// create container if needed
// TODO : should be subKeyInfo.getShouldCreateContainer(), but for now
//The following change has to reverted once HDFS-11888 is fixed.
if(!containersCreated.contains(containerName)) {
synchronized (containerName.intern()) {
//checking again, there is a chance that some other thread has
// created it.
if (!containersCreated.contains(containerName)) {
LOG.debug("Need to create container {}.", containerName);
try {
ContainerProtocolCalls.createContainer(xceiverClient, requestId);
} catch (StorageContainerException ex) {
if (ex.getResult().equals(Result.CONTAINER_EXISTS)) {
//container already exist.
LOG.debug("Container {} already exists.", containerName);
} else {
LOG.error("Container creation failed for {}.",
containerName, ex);
throw ex;
}
}
containersCreated.add(containerName);
if (subKeyInfo.getShouldCreateContainer()) {
try {
// Block manager sets the container creation stage begin.
ContainerProtocolCalls.createContainer(xceiverClient, requestId);
storageContainerLocationClient.notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type.container,
containerName,
NotifyObjectCreationStageRequestProto.Stage.complete);
} catch (StorageContainerException ex) {
if (ex.getResult().equals(Result.CONTAINER_EXISTS)) {
//container already exist, this should never happen
LOG.debug("Container {} already exists.", containerName);
} else {
LOG.error("Container creation failed for {}.", containerName, ex);
throw ex;
}
}
}

View File

@ -187,7 +187,7 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE =
"ozone.scm.container.provision_batch_size";
public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 1;
public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 5;
/**

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.scm.client;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.XceiverClientManager;
@ -86,12 +87,20 @@ public class ContainerOperationClient implements ScmClient {
client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString();
storageContainerLocationClient.notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type.container,
containerId,
NotifyObjectCreationStageRequestProto.Stage.begin);
ContainerProtocolCalls.createContainer(client, traceID);
if (LOG.isDebugEnabled()) {
LOG.debug("Created container " + containerId
+ " leader:" + pipeline.getLeader()
+ " machines:" + pipeline.getMachines());
}
storageContainerLocationClient.notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type.container,
containerId,
NotifyObjectCreationStageRequestProto.Stage.complete);
return pipeline;
} finally {
if (client != null) {
@ -116,11 +125,21 @@ public class ContainerOperationClient implements ScmClient {
// connect to pipeline leader and allocate container on leader datanode.
client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString();
storageContainerLocationClient.notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type.container,
containerId,
NotifyObjectCreationStageRequestProto.Stage.begin);
ContainerProtocolCalls.createContainer(client, traceID);
LOG.info("Created container " + containerId +
" leader:" + pipeline.getLeader() +
" machines:" + pipeline.getMachines() +
" replication factor:" + factor);
storageContainerLocationClient.notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type.container,
containerId,
NotifyObjectCreationStageRequestProto.Stage.complete);
return pipeline;
} finally {
if (client != null) {

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.scm.container.common.helpers;
/**
* Class wraps container + allocated info for containers managed by block svc.
*/
public class BlockContainerInfo extends ContainerInfo{
private long allocated;
public BlockContainerInfo(ContainerInfo container, long used) {
super(container);
this.allocated = used;
}
public long addAllocated(long size) {
allocated += size;
return allocated;
}
public long subtractAllocated(long size) {
allocated -= size;
return allocated;
}
public long getAllocated() {
return this.allocated;
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.scm.container.common.helpers;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.util.Time;
/**
* Class wraps ozone container info.
*/
public class ContainerInfo {
private OzoneProtos.LifeCycleState state;
private Pipeline pipeline;
// The wall-clock ms since the epoch at which the current state enters.
private long stateEnterTime;
ContainerInfo(OzoneProtos.LifeCycleState state, Pipeline pipeline,
long stateEnterTime) {
this.pipeline = pipeline;
this.state = state;
this.stateEnterTime = stateEnterTime;
}
public ContainerInfo(ContainerInfo container) {
this.pipeline = container.getPipeline();
this.state = container.getState();
this.stateEnterTime = container.getStateEnterTime();
}
/**
* Update the current container state and state enter time to now.
* @param state
*/
public void setState(OzoneProtos.LifeCycleState state) {
this.state = state;
this.stateEnterTime = Time.monotonicNow();
}
public OzoneProtos.LifeCycleState getState() {
return state;
}
public long getStateEnterTime() {
return stateEnterTime;
}
public Pipeline getPipeline() {
return pipeline;
}
public OzoneProtos.SCMContainerInfo getProtobuf() {
OzoneProtos.SCMContainerInfo.Builder builder =
OzoneProtos.SCMContainerInfo.newBuilder();
builder.setPipeline(getPipeline().getProtobufMessage());
builder.setState(state);
builder.setStateEnterTime(stateEnterTime);
return builder.build();
}
public static ContainerInfo fromProtobuf(
OzoneProtos.SCMContainerInfo info) {
ContainerInfo.Builder builder = new ContainerInfo.Builder();
builder.setPipeline(Pipeline.getFromProtoBuf(info.getPipeline()));
builder.setState(info.getState());
builder.setStateEnterTime(info.getStateEnterTime());
return builder.build();
}
public static class Builder {
private OzoneProtos.LifeCycleState state;
private Pipeline pipeline;
private long stateEnterTime;
public Builder setState(OzoneProtos.LifeCycleState state) {
this.state = state;
return this;
}
public Builder setPipeline(Pipeline pipeline) {
this.pipeline = pipeline;
return this;
}
public Builder setStateEnterTime(long stateEnterTime) {
this.stateEnterTime = stateEnterTime;
return this;
}
public ContainerInfo build() {
return new ContainerInfo(state, pipeline, stateEnterTime);
}
}
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
@ -35,8 +36,8 @@ public interface StorageContainerLocationProtocol {
*
*/
Pipeline allocateContainer(OzoneProtos.ReplicationType replicationType,
OzoneProtos.ReplicationFactor factor,
String containerName) throws IOException;
OzoneProtos.ReplicationFactor factor, String containerName)
throws IOException;
/**
* Ask SCM the location of the container. SCM responds with a group of
@ -85,6 +86,18 @@ public interface StorageContainerLocationProtocol {
OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState> nodeStatuses,
OzoneProtos.QueryScope queryScope, String poolName) throws IOException;
/**
* Notify from client when begin or finish creating objects like pipeline
* or containers on datanodes.
* Container will be in Operational state after that.
* @param type object type
* @param name object name
* @param stage creation stage
*/
void notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type type, String name,
NotifyObjectCreationStageRequestProto.Stage stage) throws IOException;
/**
* Creates a replication pipeline of a specified type.
* @param type - replication type
@ -95,5 +108,4 @@ public interface StorageContainerLocationProtocol {
Pipeline createReplicationPipeline(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor factor, OzoneProtos.NodePool nodePool)
throws IOException;
}

View File

@ -35,9 +35,9 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolPr
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.Closeable;
@ -207,6 +207,32 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
}
/**
* Notify from client that creates object on datanodes.
* @param type object type
* @param name object name
* @param stage object creation stage : begin/complete
*/
@Override
public void notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type type,
String name,
NotifyObjectCreationStageRequestProto.Stage stage) throws IOException {
Preconditions.checkState(!Strings.isNullOrEmpty(name),
"Object name cannot be null or empty");
NotifyObjectCreationStageRequestProto request =
NotifyObjectCreationStageRequestProto.newBuilder()
.setType(type)
.setName(name)
.setStage(stage)
.build();
try {
rpcProxy.notifyObjectCreationStage(NULL_RPC_CONTROLLER, request);
} catch(ServiceException e){
throw ProtobufHelper.getRemoteException(e);
}
}
/**
* Creates a replication pipeline of a specified type.
*

View File

@ -35,7 +35,7 @@ message Pipeline {
required string leaderID = 1;
repeated DatanodeIDProto members = 2;
required string containerName = 3;
optional LifeCycleStates state = 4 [default = OPERATIONAL];
optional LifeCycleState state = 4 [default = OPEN];
}
message KeyValue {
@ -72,6 +72,45 @@ message NodePool {
repeated Node nodes = 1;
}
/**
* LifeCycleState for SCM object creation state machine:
* ->Allocated: allocated on SCM but clean has not started creating it yet.
* ->Creating: allocated and assigned to client to create but not ack-ed yet.
* ->Open: allocated on SCM and created on datanodes and ack-ed by a client.
* ->Close: container closed due to space all used or error?
* ->Timeout -> container failed to create on datanodes or ack-ed by client.
* ->Deleting(TBD) -> container will be deleted after timeout
* 1. ALLOCATE-ed containers on SCM can't serve key/block related operation
* until ACK-ed explicitly which changes the state to OPEN.
* 2. Only OPEN/CLOSED containers can serve key/block related operation.
* 3. ALLOCATE-ed containers that are not ACK-ed timely will be TIMEOUT and
* CLEANUP asynchronously.
*/
enum LifeCycleState {
ALLOCATED = 1;
CREATING = 2; // Used for container allocated/created by different client.
OPEN =3; // Mostly an update to SCM via HB or client call.
CLOSED = 4; // !!State after this has not been used yet.
DELETING = 5;
DELETED = 6; // object is deleted.
}
enum LifeCycleEvent {
BEGIN_CREATE = 1; // A request to client to create this object
COMPLETE_CREATE = 2;
CLOSE = 3; // !!Event after this has not been used yet.
UPDATE = 4;
TIMEOUT = 5; // creation has timed out from SCM's View.
DELETE = 6;
CLEANUP = 7;
}
message SCMContainerInfo {
required LifeCycleState state = 1;
required Pipeline pipeline = 2;
optional int64 stateEnterTime = 3;
}
enum ReplicationType {
RATIS = 1;
@ -79,15 +118,7 @@ enum ReplicationType {
CHAINED = 3;
}
enum ReplicationFactor {
ONE = 1;
THREE = 3;
}
enum LifeCycleStates {
CLIENT_CREATE = 1; // A request to client to create this object
OPERATIONAL = 2; // Mostly an update to SCM via HB or client call.
TIMED_OUT = 3; // creation has timed out from SCM's View.
DELETED = 4; // object is deleted.
}

View File

@ -82,6 +82,24 @@ message DeleteContainerResponseProto {
// Empty response
}
message NotifyObjectCreationStageRequestProto {
enum Type {
container = 1;
pipeline = 2;
}
enum Stage {
begin = 1;
complete = 2;
}
required string name = 1;
required Type type = 2;
required Stage stage = 3;
}
message NotifyObjectCreationStageResponseProto {
// Empty response
}
/*
NodeQueryRequest sends a request to SCM asking to send a list of nodes that
match the NodeState that we are requesting.
@ -160,7 +178,12 @@ service StorageContainerLocationProtocolService {
*/
rpc queryNode(NodeQueryRequestProto) returns (NodeQueryResponseProto);
/*
/**
* Notify from client when begin or finish creating container or pipeline on datanodes.
*/
rpc notifyObjectCreationStage(NotifyObjectCreationStageRequestProto) returns (NotifyObjectCreationStageResponseProto);
/*
* Apis that Manage Pipelines.
*
* Pipelines are abstractions offered by SCM and Datanode that allows users
@ -175,5 +198,4 @@ service StorageContainerLocationProtocolService {
*/
rpc allocatePipeline(PipelineRequestProto)
returns (PipelineResponseProto);
}

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
import static org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto;
@ -39,9 +39,10 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolPr
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.DeleteContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
@ -159,6 +160,19 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
}
}
@Override
public NotifyObjectCreationStageResponseProto notifyObjectCreationStage(
RpcController controller, NotifyObjectCreationStageRequestProto request)
throws ServiceException {
try {
impl.notifyObjectCreationStage(request.getType(), request.getName(),
request.getStage());
return NotifyObjectCreationStageResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public PipelineResponseProto allocatePipeline(
RpcController controller, PipelineRequestProto request)

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
@ -65,6 +66,7 @@ import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
@ -373,7 +375,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
@Override
public Pipeline getContainer(String containerName) throws IOException {
checkAdminAccess();
return scmContainerManager.getContainer(containerName);
return scmContainerManager.getContainer(containerName).getPipeline();
}
/**
@ -424,6 +426,34 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
return poolBuilder.build();
}
/**
* Notify from client when begin/finish creating container/pipeline objects
* on datanodes.
* @param type
* @param name
* @param stage
*/
@Override
public void notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type type, String name,
NotifyObjectCreationStageRequestProto.Stage stage) throws IOException {
if (type == NotifyObjectCreationStageRequestProto.Type.container) {
ContainerInfo info = scmContainerManager.getContainer(name);
LOG.info("Container {} current state {} new stage {}", name,
info.getState(), stage);
if (stage == NotifyObjectCreationStageRequestProto.Stage.begin) {
scmContainerManager.updateContainerState(name,
OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
} else {
scmContainerManager.updateContainerState(name,
OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
}
} else if (type == NotifyObjectCreationStageRequestProto.Type.pipeline) {
// TODO: pipeline state update will be addressed in future patch.
}
}
/**
* Creates a replication pipeline of a specified type.
*/
@ -503,7 +533,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
*
* @param containerName - Name of the container.
* @param replicationFactor - replication factor.
* @return Pipeline.
* @return pipeline
* @throws IOException
*/
@Override
@ -512,7 +542,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
throws IOException {
checkAdminAccess();
return scmContainerManager.allocateContainer(replicationType,
replicationFactor, containerName);
replicationFactor, containerName).getPipeline();
}
/**

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.scm.block;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.metrics2.util.MBeans;
@ -29,6 +30,8 @@ import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.utils.BatchOperation;
@ -83,8 +86,11 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
private final long containerSize;
private final long cacheSize;
private final MetadataStore openContainerStore;
private Map<String, Long> openContainers;
// Track all containers owned by block service.
private final MetadataStore containerStore;
private Map<OzoneProtos.LifeCycleState,
Map<String, BlockContainerInfo>> containers;
private final int containerProvisionBatchSize;
private final Random rand;
private final ObjectName mxBean;
@ -121,14 +127,13 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
// Load store of all open contains for block allocation
File openContainsDbPath = new File(scmMetaDataDir, OPEN_CONTAINERS_DB);
openContainerStore = MetadataStoreBuilder.newBuilder()
containerStore = MetadataStoreBuilder.newBuilder()
.setConf(conf)
.setDbFile(openContainsDbPath)
.setCacheSize(this.cacheSize * OzoneConsts.MB)
.build();
openContainers = new ConcurrentHashMap<>();
loadOpenContainers();
loadAllocatedContainers();
this.containerProvisionBatchSize = conf.getInt(
ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
@ -141,20 +146,39 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
// TODO: close full (or almost full) containers with a separate thread.
/**
* Load open containers from persistent store.
* Load allocated containers from persistent store.
* @throws IOException
*/
private void loadOpenContainers() throws IOException {
private void loadAllocatedContainers() throws IOException {
// Pre-allocate empty map entry by state to avoid null check
containers = new ConcurrentHashMap<>();
for (OzoneProtos.LifeCycleState state :
OzoneProtos.LifeCycleState.values()) {
containers.put(state, new ConcurrentHashMap());
}
try {
openContainerStore.iterate(null, (key, value) -> {
containerStore.iterate(null, (key, value) -> {
try {
String containerName = DFSUtil.bytes2String(key);
Long containerUsed = Long.parseLong(DFSUtil.bytes2String(value));
openContainers.put(containerName, containerUsed);
LOG.debug("Loading open container: {} used : {}", containerName,
containerUsed);
ContainerInfo containerInfo =
containerManager.getContainer(containerName);
// TODO: remove the container from block manager's container DB
// Most likely the allocated container is timeout and cleaned up
// by SCM, we should clean up correspondingly instead of just skip it.
if (containerInfo == null) {
LOG.warn("Container {} allocated by block service" +
"can't be found in SCM", containerName);
return true;
}
Map<String, BlockContainerInfo> containersByState =
containers.get(containerInfo.getState());
containersByState.put(containerName,
new BlockContainerInfo(containerInfo, containerUsed));
LOG.debug("Loading allocated container: {} used : {} state: {}",
containerName, containerUsed, containerInfo.getState());
} catch (Exception e) {
LOG.warn("Failed loading open container, continue next...");
LOG.warn("Failed loading allocated container, continue next...");
}
return true;
});
@ -166,25 +190,26 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
}
/**
* Pre-provision specified count of containers for block creation.
* @param count - number of containers to create.
* @return list of container names created.
* Pre allocate specified count of containers for block creation.
* @param count - number of containers to allocate.
* @return list of container names allocated.
* @throws IOException
*/
private List<String> provisionContainers(int count) throws IOException {
private List<String> allocateContainers(int count) throws IOException {
List<String> results = new ArrayList();
lock.lock();
try {
for (int i = 0; i < count; i++) {
String containerName = UUID.randomUUID().toString();
ContainerInfo containerInfo = null;
try {
// TODO: Fix this later when Ratis is made the Default.
Pipeline pipeline = containerManager.allocateContainer(
containerInfo = containerManager.allocateContainer(
OzoneProtos.ReplicationType.STAND_ALONE,
OzoneProtos.ReplicationFactor.ONE,
containerName);
if (pipeline == null) {
if (containerInfo == null) {
LOG.warn("Unable to allocate container.");
continue;
}
@ -192,8 +217,12 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
LOG.warn("Unable to allocate container: " + ex);
continue;
}
openContainers.put(containerName, 0L);
openContainerStore.put(DFSUtil.string2Bytes(containerName),
Map<String, BlockContainerInfo> containersByState =
containers.get(OzoneProtos.LifeCycleState.ALLOCATED);
Preconditions.checkNotNull(containersByState);
containersByState.put(containerName,
new BlockContainerInfo(containerInfo, 0));
containerStore.put(DFSUtil.string2Bytes(containerName),
DFSUtil.string2Bytes(Long.toString(0L)));
results.add(containerName);
}
@ -203,6 +232,76 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
return results;
}
/**
* Filter container by states and size.
* @param state the state of the container.
* @param size the minimal available size of the container
* @return allocated containers satisfy both state and size.
*/
private List <String> filterContainers(OzoneProtos.LifeCycleState state,
long size) {
Map<String, BlockContainerInfo> containersByState =
this.containers.get(state);
return containersByState.entrySet().parallelStream()
.filter(e -> ((e.getValue().getAllocated() + size < containerSize)))
.map(e -> e.getKey())
.collect(Collectors.toList());
}
private BlockContainerInfo getContainer(OzoneProtos.LifeCycleState state,
String name) {
Map<String, BlockContainerInfo> containersByState = this.containers.get(state);
return containersByState.get(name);
}
// Relies on the caller such as allocateBlock() to hold the lock
// to ensure containers map consistent.
private void updateContainer(OzoneProtos.LifeCycleState oldState, String name,
OzoneProtos.LifeCycleState newState) {
if (LOG.isDebugEnabled()) {
LOG.debug("Update container {} from state {} to state {}",
name, oldState, newState);
}
Map<String, BlockContainerInfo> containersInOldState =
this.containers.get(oldState);
BlockContainerInfo containerInfo = containersInOldState.get(name);
Preconditions.checkNotNull(containerInfo);
containersInOldState.remove(name);
Map<String, BlockContainerInfo> containersInNewState =
this.containers.get(newState);
containersInNewState.put(name, containerInfo);
}
// Refresh containers that have been allocated.
// We may not need to track all the states, just the creating/open/close
// should be enough for now.
private void refreshContainers() {
Map<String, BlockContainerInfo> containersByState =
this.containers.get(OzoneProtos.LifeCycleState.ALLOCATED);
for (String containerName: containersByState.keySet()) {
try {
ContainerInfo containerInfo =
containerManager.getContainer(containerName);
if (containerInfo == null) {
// TODO: clean up containers that has been deleted on SCM but
// TODO: still in ALLOCATED state in block manager.
LOG.debug("Container {} allocated by block service" +
"can't be found in SCM", containerName);
continue;
}
if (containerInfo.getState() == OzoneProtos.LifeCycleState.OPEN) {
updateContainer(OzoneProtos.LifeCycleState.ALLOCATED, containerName,
containerInfo.getState());
}
// TODO: check containers in other state and refresh as needed.
// TODO: ALLOCATED container that is timeout and DELETED. (unit test)
// TODO: OPEN container that is CLOSE.
} catch (IOException ex) {
LOG.debug("Failed to get container info for: {}", containerName);
}
}
}
/**
* Allocates a new block for a given size.
*
@ -215,8 +314,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
*/
@Override
public AllocatedBlock allocateBlock(final long size) throws IOException {
boolean createContainer;
Pipeline pipeline;
boolean createContainer = false;
if (size < 0 || size > containerSize) {
throw new SCMException("Unsupported block size",
INVALID_BLOCK_SIZE);
@ -228,37 +326,29 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
lock.lock();
try {
refreshContainers();
List<String> candidates;
if (openContainers.size() == 0) {
try {
candidates = provisionContainers(containerProvisionBatchSize);
} catch (IOException ex) {
throw new SCMException("Unable to allocate container for the block",
FAILED_TO_ALLOCATE_CONTAINER);
}
createContainer = true;
} else {
candidates = openContainers.entrySet().parallelStream()
.filter(e -> (e.getValue() + size < containerSize))
.map(e -> e.getKey())
.collect(Collectors.toList());
createContainer = false;
}
candidates = filterContainers(OzoneProtos.LifeCycleState.OPEN, size);
if (candidates.size() == 0) {
try {
candidates = provisionContainers(containerProvisionBatchSize);
} catch (IOException ex) {
throw new SCMException("Unable to allocate container for the block",
FAILED_TO_ALLOCATE_CONTAINER);
candidates = filterContainers(OzoneProtos.LifeCycleState.ALLOCATED,
size);
if (candidates.size() == 0) {
try {
candidates = allocateContainers(containerProvisionBatchSize);
} catch (IOException ex) {
LOG.error("Unable to allocate container for the block.");
throw new SCMException("Unable to allocate container for the block",
FAILED_TO_ALLOCATE_CONTAINER);
}
}
// now we should have some candidates in ALLOCATE state
if (candidates.size() == 0) {
throw new SCMException("Fail to find any container to allocate block " +
"of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE);
}
}
if (candidates.size() == 0) {
throw new SCMException("Fail to find any container to allocate block " +
"of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE);
}
// Candidates list now should include only ALLOCATE or OPEN containers
int randomIdx = rand.nextInt(candidates.size());
String containerName = candidates.get(randomIdx);
if (LOG.isDebugEnabled()) {
@ -266,28 +356,46 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
candidates.toString(), containerName);
}
pipeline = containerManager.getContainer(containerName);
if (pipeline == null) {
ContainerInfo containerInfo =
containerManager.getContainer(containerName);
if (containerInfo == null) {
LOG.debug("Unable to find container for the block");
throw new SCMException("Unable to find container to allocate block",
FAILED_TO_FIND_CONTAINER);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Candidate {} state {}", containerName,
containerInfo.getState());
}
// Container must be either OPEN or ALLOCATE state
if (containerInfo.getState() == OzoneProtos.LifeCycleState.ALLOCATED) {
createContainer = true;
}
// TODO: make block key easier to debug (e.g., seq no)
// Allocate key for the block
String blockKey = UUID.randomUUID().toString();
AllocatedBlock.Builder abb = new AllocatedBlock.Builder()
.setKey(blockKey).setPipeline(pipeline)
.setKey(blockKey).setPipeline(containerInfo.getPipeline())
.setShouldCreateContainer(createContainer);
if (pipeline.getMachines().size() > 0) {
if (containerInfo.getPipeline().getMachines().size() > 0) {
blockStore.put(DFSUtil.string2Bytes(blockKey),
DFSUtil.string2Bytes(containerName));
// update the container usage information
Long newUsed = openContainers.get(containerName) + size;
openContainers.put(containerName, newUsed);
openContainerStore.put(DFSUtil.string2Bytes(containerName),
DFSUtil.string2Bytes(Long.toString(newUsed)));
BlockContainerInfo containerInfoUpdate =
getContainer(containerInfo.getState(), containerName);
Preconditions.checkNotNull(containerInfoUpdate);
containerInfoUpdate.addAllocated(size);
containerStore.put(DFSUtil.string2Bytes(containerName),
DFSUtil.string2Bytes(Long.toString(containerInfoUpdate.getAllocated())));
if (createContainer) {
OzoneProtos.LifeCycleState newState =
containerManager.updateContainerState(containerName,
OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
updateContainer(containerInfo.getState(), containerName, newState);
}
return abb.build();
}
} finally {
@ -312,8 +420,16 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
throw new SCMException("Specified block key does not exist. key : " +
key, FAILED_TO_FIND_BLOCK);
}
return containerManager.getContainer(
DFSUtil.bytes2String(containerBytes));
String containerName = DFSUtil.bytes2String(containerBytes);
ContainerInfo containerInfo = containerManager.getContainer(
containerName);
if (containerInfo == null) {
LOG.debug("Container {} allocated by block service" +
"can't be found in SCM", containerName);
throw new SCMException("Unable to find container for the block",
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
}
return containerInfo.getPipeline();
} finally {
lock.unlock();
}
@ -338,8 +454,9 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
throw new SCMException("Specified block key does not exist. key : " +
key, FAILED_TO_FIND_BLOCK);
}
// TODO: track the block size info so that we can reclaim the container
// TODO: used space when the block is deleted.
BatchOperation batch = new BatchOperation();
containerManager.getContainer(DFSUtil.bytes2String(containerBytes));
String deletedKeyName = getDeletedKeyName(key);
// Add a tombstone for the deleted key
batch.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes);
@ -370,8 +487,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
if (blockStore != null) {
blockStore.close();
}
if (openContainerStore != null) {
openContainerStore.close();
if (containerStore != null) {
containerStore.close();
}
MBeans.unregister(mxBean);
@ -379,6 +496,6 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
@Override
public int getOpenContainersNo() {
return openContainers.size();
return containers.get(OzoneProtos.LifeCycleState.OPEN).size();
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.scm.cli;
import com.google.common.base.Preconditions;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
@ -33,8 +34,10 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.Buck
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Pipeline;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.utils.MetadataStore;
@ -482,10 +485,14 @@ public class SQLCLI extends Configured implements Tool {
HashSet<String> uuidChecked = new HashSet<>();
dbStore.iterate(null, (key, value) -> {
String containerName = new String(key, encoding);
Pipeline pipeline = null;
pipeline = Pipeline.parseFrom(value);
ContainerInfo containerInfo = null;
containerInfo = ContainerInfo.fromProtobuf(
OzoneProtos.SCMContainerInfo.PARSER.parseFrom(value));
Preconditions.checkNotNull(containerInfo);
try {
insertContainerDB(conn, containerName, pipeline, uuidChecked);
//TODO: include container state to sqllite schema
insertContainerDB(conn, containerName,
containerInfo.getPipeline().getProtobufMessage(), uuidChecked);
return true;
} catch (SQLException e) {
throw new IOException(e);

View File

@ -20,13 +20,17 @@ package org.apache.hadoop.ozone.scm.container;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
import org.apache.hadoop.utils.MetadataStore;
@ -38,8 +42,10 @@ import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -60,6 +66,9 @@ public class ContainerMapping implements Mapping {
private final MetadataStore containerStore;
private final PipelineSelector pipelineSelector;
private final StateMachine<OzoneProtos.LifeCycleState,
OzoneProtos.LifeCycleEvent> stateMachine;
/**
* Constructs a mapping class that creates mapping between container names and
* pipelines.
@ -88,31 +97,80 @@ public class ContainerMapping implements Mapping {
.build();
this.lock = new ReentrantLock();
this.pipelineSelector = new PipelineSelector(nodeManager, conf);
// Initialize the container state machine.
Set<OzoneProtos.LifeCycleState> finalStates = new HashSet();
finalStates.add(OzoneProtos.LifeCycleState.OPEN);
finalStates.add(OzoneProtos.LifeCycleState.CLOSED);
finalStates.add(OzoneProtos.LifeCycleState.DELETED);
this.stateMachine = new StateMachine<>(
OzoneProtos.LifeCycleState.ALLOCATED, finalStates);
initializeStateMachine();
}
// Client-driven Create State Machine
// States: <ALLOCATED>------------->CREATING----------------->[OPEN]
// Events: (BEGIN_CREATE) | (COMPLETE_CREATE)
// |
// |(TIMEOUT)
// V
// DELETING----------------->[DELETED]
// (CLEANUP)
// SCM Open/Close State Machine
// States: OPEN------------------>[CLOSED]
// Events: (CLOSE)
// Delete State Machine
// States: OPEN------------------>DELETING------------------>[DELETED]
// Events: (DELETE) (CLEANUP)
private void initializeStateMachine() {
stateMachine.addTransition(OzoneProtos.LifeCycleState.ALLOCATED,
OzoneProtos.LifeCycleState.CREATING,
OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
stateMachine.addTransition(OzoneProtos.LifeCycleState.CREATING,
OzoneProtos.LifeCycleState.OPEN,
OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
stateMachine.addTransition(OzoneProtos.LifeCycleState.OPEN,
OzoneProtos.LifeCycleState.CLOSED,
OzoneProtos.LifeCycleEvent.CLOSE);
stateMachine.addTransition(OzoneProtos.LifeCycleState.OPEN,
OzoneProtos.LifeCycleState.DELETING,
OzoneProtos.LifeCycleEvent.DELETE);
stateMachine.addTransition(OzoneProtos.LifeCycleState.DELETING,
OzoneProtos.LifeCycleState.DELETED,
OzoneProtos.LifeCycleEvent.CLEANUP);
// Creating timeout -> Deleting
stateMachine.addTransition(OzoneProtos.LifeCycleState.CREATING,
OzoneProtos.LifeCycleState.DELETING,
OzoneProtos.LifeCycleEvent.TIMEOUT);
}
/**
* Returns the Pipeline from the container name.
*
* @param containerName - Name
* @return - Pipeline that makes up this container.
* {@inheritDoc}
*/
@Override
public Pipeline getContainer(final String containerName) throws IOException {
Pipeline pipeline;
public ContainerInfo getContainer(final String containerName) throws IOException {
ContainerInfo containerInfo;
lock.lock();
try {
byte[] pipelineBytes =
byte[] containerBytes =
containerStore.get(containerName.getBytes(encoding));
if (pipelineBytes == null) {
if (containerBytes == null) {
throw new SCMException("Specified key does not exist. key : " +
containerName, SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
}
pipeline = Pipeline.getFromProtoBuf(
OzoneProtos.Pipeline.PARSER.parseFrom(pipelineBytes));
return pipeline;
containerInfo = ContainerInfo.fromProtobuf(
OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes));
return containerInfo;
} finally {
lock.unlock();
}
@ -138,10 +196,13 @@ public class ContainerMapping implements Mapping {
containerStore.getRangeKVs(startKey, count, prefixFilter);
// Transform the values into the pipelines.
// TODO: return list of ContainerInfo instead of pipelines.
// TODO: filter by container state
for (Map.Entry<byte[], byte[]> entry : range) {
Pipeline pipeline = Pipeline.getFromProtoBuf(
OzoneProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
pipelineList.add(pipeline);
ContainerInfo containerInfo = ContainerInfo.fromProtobuf(
OzoneProtos.SCMContainerInfo.PARSER.parseFrom(entry.getValue()));
Preconditions.checkNotNull(containerInfo);
pipelineList.add(containerInfo.getPipeline());
}
} finally {
lock.unlock();
@ -158,12 +219,12 @@ public class ContainerMapping implements Mapping {
* @throws IOException - Exception
*/
@Override
public Pipeline allocateContainer(OzoneProtos.ReplicationType type,
public ContainerInfo allocateContainer(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor replicationFactor,
final String containerName) throws IOException {
Preconditions.checkNotNull(containerName);
Preconditions.checkState(!containerName.isEmpty());
Pipeline pipeline = null;
ContainerInfo containerInfo = null;
if (!nodeManager.isOutOfNodeChillMode()) {
throw new SCMException("Unable to create container while in chill mode",
SCMException.ResultCodes.CHILL_MODE_EXCEPTION);
@ -171,20 +232,25 @@ public class ContainerMapping implements Mapping {
lock.lock();
try {
byte[] pipelineBytes =
byte[] containerBytes =
containerStore.get(containerName.getBytes(encoding));
if (pipelineBytes != null) {
if (containerBytes != null) {
throw new SCMException("Specified container already exists. key : " +
containerName, SCMException.ResultCodes.CONTAINER_EXISTS);
}
pipeline = pipelineSelector.getReplicationPipeline(type,
Pipeline pipeline = pipelineSelector.getReplicationPipeline(type,
replicationFactor, containerName);
containerInfo = new ContainerInfo.Builder()
.setState(OzoneProtos.LifeCycleState.ALLOCATED)
.setPipeline(pipeline)
.setStateEnterTime(Time.monotonicNow())
.build();
containerStore.put(containerName.getBytes(encoding),
pipeline.getProtobufMessage().toByteArray());
containerInfo.getProtobuf().toByteArray());
} finally {
lock.unlock();
}
return pipeline;
return containerInfo;
}
/**
@ -199,9 +265,9 @@ public class ContainerMapping implements Mapping {
lock.lock();
try {
byte[] dbKey = containerName.getBytes(encoding);
byte[] pipelineBytes =
byte[] containerBytes =
containerStore.get(dbKey);
if (pipelineBytes == null) {
if(containerBytes == null) {
throw new SCMException("Failed to delete container "
+ containerName + ", reason : container doesn't exist.",
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
@ -212,6 +278,44 @@ public class ContainerMapping implements Mapping {
}
}
/**
* {@inheritDoc}
* Used by client to update container state on SCM.
*/
@Override
public OzoneProtos.LifeCycleState updateContainerState(String containerName,
OzoneProtos.LifeCycleEvent event) throws IOException {
ContainerInfo containerInfo;
lock.lock();
try {
byte[] dbKey = containerName.getBytes(encoding);
byte[] containerBytes =
containerStore.get(dbKey);
if(containerBytes == null) {
throw new SCMException("Failed to update container state"
+ containerName + ", reason : container doesn't exist.",
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
}
containerInfo = ContainerInfo.fromProtobuf(
OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes));
OzoneProtos.LifeCycleState newState;
try {
newState = stateMachine.getNextState(containerInfo.getState(), event);
} catch (InvalidStateTransitionException ex) {
throw new SCMException("Failed to update container state"
+ containerName + ", reason : invalid state transition from state: "
+ containerInfo.getState() + " upon event: " + event + ".",
SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE);
}
containerInfo.setState(newState);
containerStore.put(dbKey, containerInfo.getProtobuf().toByteArray());
return newState;
} finally {
lock.unlock();
}
}
/**
* Closes this stream and releases any system resources associated with it. If
* the stream is already closed then invoking this method has no effect.

View File

@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.scm.container;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.Closeable;
@ -30,13 +31,13 @@ import java.util.List;
*/
public interface Mapping extends Closeable {
/**
* Returns the Pipeline from the container name.
* Returns the ContainerInfo from the container name.
*
* @param containerName - Name
* @return - Pipeline that makes up this container.
* @return - ContainerInfo such as creation state and the pipeline.
* @throws IOException
*/
Pipeline getContainer(String containerName) throws IOException;
ContainerInfo getContainer(String containerName) throws IOException;
/**
* Returns pipelines under certain conditions.
@ -57,16 +58,15 @@ public interface Mapping extends Closeable {
List<Pipeline> listContainer(String startName, String prefixName, int count)
throws IOException;
/**
* Allocates a new container for a given keyName and replication factor.
*
* @param containerName - Name.
* @param replicationFactor - replication factor of the container.
* @return - Pipeline that makes up this container.
* @return - Container Info.
* @throws IOException
*/
Pipeline allocateContainer(OzoneProtos.ReplicationType type,
ContainerInfo allocateContainer(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor replicationFactor,
String containerName) throws IOException;
@ -77,4 +77,14 @@ public interface Mapping extends Closeable {
* @throws IOException
*/
void deleteContainer(String containerName) throws IOException;
/**
* Update container state.
* @param containerName - Container Name
* @param event - container life cycle event
* @return - new container state
* @throws IOException
*/
OzoneProtos.LifeCycleState updateContainerState(String containerName,
OzoneProtos.LifeCycleEvent event) throws IOException;
}

View File

@ -106,6 +106,7 @@ public class SCMException extends IOException {
CHILL_MODE_EXCEPTION,
FAILED_TO_LOAD_OPEN_CONTAINER,
FAILED_TO_ALLOCATE_CONTAINER,
FAILED_TO_CHANGE_CONTAINER_STATE,
CONTAINER_EXISTS,
FAILED_TO_FIND_CONTAINER,
FAILED_TO_FIND_CONTAINER_WITH_SAPCE,

View File

@ -51,7 +51,7 @@ message SCMHeartbeatRequestProto {
optional ReportState containerReportState = 3;
}
enum ContainerState {
enum DatanodeContainerState {
closed = 0;
open = 1;
}
@ -76,7 +76,7 @@ SCM database, This information allows SCM to startup faster and avoid having
all container info in memory all the time.
*/
message ContainerPersistanceProto {
required ContainerState state = 1;
required DatanodeContainerState state = 1;
required hadoop.hdfs.ozone.Pipeline pipeline = 2;
required ContainerInfo info = 3;
}
@ -89,8 +89,6 @@ message NodeContianerMapping {
repeated string contianerName = 1;
}
/**
A container report contains the following information.
*/

View File

@ -319,20 +319,25 @@ public class TestOzoneRpcClient {
throws IOException, OzoneException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
String value = "sample value";
ozClient.createVolume(volumeName);
ozClient.createBucket(volumeName, bucketName);
OzoneOutputStream out = ozClient.createKey(volumeName, bucketName,
keyName, value.getBytes().length);
out.write(value.getBytes());
out.close();
OzoneKey key = ozClient.getKeyDetails(volumeName, bucketName, keyName);
Assert.assertEquals(keyName, key.getKeyName());
OzoneInputStream is = ozClient.getKey(volumeName, bucketName, keyName);
byte[] fileContent = new byte[value.getBytes().length];
is.read(fileContent);
Assert.assertEquals(value, new String(fileContent));
for (int i = 0; i < 10; i++) {
String keyName = UUID.randomUUID().toString();
OzoneOutputStream out = ozClient.createKey(volumeName, bucketName,
keyName, value.getBytes().length);
out.write(value.getBytes());
out.close();
OzoneKey key = ozClient.getKeyDetails(volumeName, bucketName, keyName);
Assert.assertEquals(keyName, key.getKeyName());
OzoneInputStream is = ozClient.getKey(volumeName, bucketName, keyName);
byte[] fileContent = new byte[value.getBytes().length];
is.read(fileContent);
Assert.assertEquals(value, new String(fileContent));
}
}
@Test

View File

@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
@ -80,11 +81,11 @@ public class TestContainerMapping {
@Test
public void testallocateContainer() throws Exception {
Pipeline pipeline = mapping.allocateContainer(
ContainerInfo containerInfo = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
UUID.randomUUID().toString());
Assert.assertNotNull(pipeline);
Assert.assertNotNull(containerInfo);
}
@Test
@ -97,13 +98,15 @@ public class TestContainerMapping {
*/
Set<String> pipelineList = new TreeSet<>();
for (int x = 0; x < 30; x++) {
Pipeline pipeline = mapping.allocateContainer(
ContainerInfo containerInfo = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
UUID.randomUUID().toString());
Assert.assertNotNull(pipeline);
pipelineList.add(pipeline.getLeader().getDatanodeUuid());
Assert.assertNotNull(containerInfo);
Assert.assertNotNull(containerInfo.getPipeline());
pipelineList.add(containerInfo.getPipeline().getLeader()
.getDatanodeUuid());
}
Assert.assertTrue(pipelineList.size() > 5);
}
@ -113,9 +116,9 @@ public class TestContainerMapping {
String containerName = UUID.randomUUID().toString();
Pipeline pipeline = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerName);
xceiverClientManager.getFactor(), containerName).getPipeline();
Assert.assertNotNull(pipeline);
Pipeline newPipeline = mapping.getContainer(containerName);
Pipeline newPipeline = mapping.getContainer(containerName).getPipeline();
Assert.assertEquals(pipeline.getLeader().getDatanodeUuid(),
newPipeline.getLeader().getDatanodeUuid());
}
@ -125,7 +128,7 @@ public class TestContainerMapping {
String containerName = UUID.randomUUID().toString();
Pipeline pipeline = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerName);
xceiverClientManager.getFactor(), containerName).getPipeline();
Assert.assertNotNull(pipeline);
thrown.expectMessage("Specified container already exists.");
mapping.allocateContainer(xceiverClientManager.getType(),

View File

@ -154,7 +154,7 @@ public class TestContainerPlacement {
String container1 = UUID.randomUUID().toString();
Pipeline pipeline1 = containerManager.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container1);
xceiverClientManager.getFactor(), container1).getPipeline();
assertEquals(xceiverClientManager.getFactor().getNumber(),
pipeline1.getMachines().size());