diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java index 745982f4f90..526d6b7aafe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java @@ -147,6 +147,10 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY = "ozone.scm.container.placement.impl"; + public static final String OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE = + "ozone.scm.container.provision_batch_size"; + public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 10; + /** * Never constructed. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/AllocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/AllocatedBlock.java new file mode 100644 index 00000000000..f51336faf96 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/AllocatedBlock.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.scm.container.common.helpers; + +/** + * Allocated block wraps the result returned from SCM#allocateBlock which + * contains a Pipeline and the key. + */ +public final class AllocatedBlock { + private Pipeline pipeline; + private String key; + // Indicates whether the client should create container before writing block. + private boolean shouldCreateContainer; + + /** + * Builder for AllocatedBlock. + */ + public static class Builder { + private Pipeline pipeline; + private String key; + private boolean shouldCreateContainer; + + public Builder setPipeline(Pipeline p) { + this.pipeline = p; + return this; + } + + public Builder setKey(String k) { + this.key = k; + return this; + } + + public Builder setShouldCreateContainer(boolean shouldCreate) { + this.shouldCreateContainer = shouldCreate; + return this; + } + + public AllocatedBlock build() { + return new AllocatedBlock(pipeline, key, shouldCreateContainer); + } + } + + private AllocatedBlock(Pipeline pipeline, String key, + boolean shouldCreateContainer) { + this.pipeline = pipeline; + this.key = key; + this.shouldCreateContainer = shouldCreateContainer; + } + + public Pipeline getPipeline() { + return pipeline; + } + + public String getKey() { + return key; + } + + public boolean getCreateContainer() { + return shouldCreateContainer; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java new file mode 100644 index 00000000000..5bdfd573389 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.scm.protocol; + +import java.io.IOException; +import java.util.Set; + +import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; + +/** + * ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes + * to read/write a block. + */ +public interface ScmBlockLocationProtocol { + + /** + * Find the set of nodes to read/write a block, as + * identified by the block key. This method supports batch lookup by + * passing multiple keys. + * + * @param keys batch of block keys to find + * @return allocated blocks for each block key + * @throws IOException if there is any failure + */ + Set getBlockLocations(Set keys) + throws IOException; + + /** + * Asks SCM where a block should be allocated. SCM responds with the + * set of datanodes that should be used creating this block. + * @param size - size of the block. + * @return allocated block accessing info (key, pipeline). + * @throws IOException + */ + AllocatedBlock allocateBlock(long size) throws IOException; + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmLocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmLocatedBlock.java new file mode 100644 index 00000000000..4e4b3d669ea --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmLocatedBlock.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.scm.protocol; + +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; + +/** + * Holds the nodes that currently host the block for a block key. + */ +@InterfaceAudience.Private +public final class ScmLocatedBlock { + private final String key; + private final List locations; + private final DatanodeInfo leader; + + /** + * Creates a ScmLocatedBlock. + * + * @param key object key + * @param locations nodes that currently host the block + * @param leader node that currently acts as pipeline leader + */ + public ScmLocatedBlock(final String key, final List locations, + final DatanodeInfo leader) { + this.key = key; + this.locations = locations; + this.leader = leader; + } + + /** + * Returns the object key. + * + * @return object key + */ + public String getKey() { + return this.key; + } + + /** + * Returns the node that currently acts as pipeline leader. + * + * @return node that currently acts as pipeline leader + */ + public DatanodeInfo getLeader() { + return this.leader; + } + + /** + * Returns the nodes that currently host the block. + * + * @return List nodes that currently host the block + */ + public List getLocations() { + return this.locations; + } + + @Override + public boolean equals(Object otherObj) { + if (otherObj == null) { + return false; + } + if (!(otherObj instanceof ScmLocatedBlock)) { + return false; + } + ScmLocatedBlock other = (ScmLocatedBlock)otherObj; + return this.key == null ? other.key == null : this.key.equals(other.key); + } + + @Override + public int hashCode() { + return key.hashCode(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{key=" + key + "; locations=" + + locations.stream().map(loc -> loc.toString()).collect(Collectors + .joining(",")) + "; leader=" + leader + "}"; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto index 5582f2499eb..f3ad7ddd908 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto @@ -84,6 +84,54 @@ message ContainerResponseProto { optional string errorMessage = 3; } +// SCM Block protocol +/** + * keys - batch of block keys to find + */ +message GetScmBlockLocationsRequestProto { + repeated string keys = 1; +} + +/** + * locatedBlocks - for each requested hash, nodes that currently host the + * container for that object key hash + */ +message GetScmBlockLocationsResponseProto { + repeated ScmLocatedBlockProto locatedBlocks = 1; +} + +/** + * Holds the nodes that currently host the blocks for a key. + */ +message ScmLocatedBlockProto { + required string key = 1; + required hadoop.hdfs.ozone.Pipeline pipeline = 2; +} + +/** +* Request send to SCM asking allocate block of specified size. +*/ +message AllocateScmBlockRequestProto { + required uint64 size = 1; +} + +/** + * Reply from SCM indicating that the container. + */ +message AllocateScmBlockResponseProto { + enum Error { + success = 1; + errorNotEnoughSpace = 2; + errorSizeTooBig = 3; + unknownFailure = 4; + } + required Error errorCode = 1; + required string key = 2; + required hadoop.hdfs.ozone.Pipeline pipeline = 3; + required bool createContainer = 4; + optional string errorMessage = 5; +} + /** * Protocol used from an HDFS node to StorageContainerManager. See the request * and response messages for details of the RPC calls. @@ -102,4 +150,18 @@ service StorageContainerLocationProtocolService { */ rpc allocateContainer(ContainerRequestProto) returns (ContainerResponseProto); + + /** + * Find the set of nodes that currently host the block, as + * identified by the key. This method supports batch lookup by + * passing multiple keys. + */ + rpc getScmBlockLocations(GetScmBlockLocationsRequestProto) + returns(GetScmBlockLocationsResponseProto); + + /** + Creates a block entry in SCM. + */ + rpc allocateScmBlock(AllocateScmBlockRequestProto) + returns (AllocateScmBlockResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java index b634e56f5ea..619bc67abd6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -27,7 +27,9 @@ import com.google.protobuf.ServiceException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.scm.protocol.LocatedContainer; +import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.ozone.protocol.proto .StorageContainerLocationProtocolProtos; @@ -37,11 +39,21 @@ import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto .StorageContainerLocationProtocolProtos .GetStorageContainerLocationsResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerLocationProtocolProtos + .ScmLocatedBlockProto; import org.apache.hadoop.ozone.protocol.proto .StorageContainerLocationProtocolProtos.LocatedContainerProto; import org.apache.hadoop.ozone.protocol.proto .StorageContainerLocationProtocolProtos.ContainerResponseProto; - +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerLocationProtocolProtos.AllocateScmBlockRequestProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerLocationProtocolProtos.AllocateScmBlockResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerLocationProtocolProtos.GetScmBlockLocationsRequestProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerLocationProtocolProtos.GetScmBlockLocationsResponseProto; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB; @@ -55,6 +67,7 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB implements StorageContainerLocationProtocolPB { private final StorageContainerLocationProtocol impl; + private final ScmBlockLocationProtocol blockImpl; /** * Creates a new StorageContainerLocationProtocolServerSideTranslatorPB. @@ -62,8 +75,10 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB * @param impl {@link StorageContainerLocationProtocol} server implementation */ public StorageContainerLocationProtocolServerSideTranslatorPB( - StorageContainerLocationProtocol impl) { + StorageContainerLocationProtocol impl, + ScmBlockLocationProtocol blockImpl) throws IOException { this.impl = impl; + this.blockImpl = blockImpl; } @Override @@ -114,4 +129,57 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB throw new ServiceException(e); } } + + @Override + public GetScmBlockLocationsResponseProto getScmBlockLocations( + RpcController controller, GetScmBlockLocationsRequestProto req) + throws ServiceException { + Set keys = Sets.newLinkedHashSetWithExpectedSize( + req.getKeysCount()); + for (String key : req.getKeysList()) { + keys.add(key); + } + final Set blocks; + try { + blocks = blockImpl.getBlockLocations(keys); + } catch (IOException ex) { + throw new ServiceException(ex); + } + GetScmBlockLocationsResponseProto.Builder resp = + GetScmBlockLocationsResponseProto.newBuilder(); + for (AllocatedBlock block: blocks) { + ScmLocatedBlockProto.Builder locatedBlock = + ScmLocatedBlockProto.newBuilder() + .setKey(block.getKey()) + .setPipeline(block.getPipeline().getProtobufMessage()); + resp.addLocatedBlocks(locatedBlock.build()); + } + return resp.build(); + } + + @Override + public AllocateScmBlockResponseProto allocateScmBlock( + RpcController controller, AllocateScmBlockRequestProto request) + throws ServiceException { + try { + AllocatedBlock allocatedBlock = + blockImpl.allocateBlock(request.getSize()); + if (allocatedBlock != null) { + return StorageContainerLocationProtocolProtos + .AllocateScmBlockResponseProto.newBuilder() + .setKey(allocatedBlock.getKey()) + .setPipeline(allocatedBlock.getPipeline().getProtobufMessage()) + .setCreateContainer(allocatedBlock.getCreateContainer()) + .setErrorCode(AllocateScmBlockResponseProto.Error.success) + .build(); + } else { + return StorageContainerLocationProtocolProtos + .AllocateScmBlockResponseProto.newBuilder() + .setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure) + .build(); + } + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index f3112c9a82d..ead8649807d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -30,7 +30,11 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.ozone.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.scm.block.BlockManager; +import org.apache.hadoop.ozone.scm.block.BlockManagerImpl; import org.apache.hadoop.scm.client.ScmClient; +import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.scm.protocol.LocatedContainer; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol; @@ -80,12 +84,13 @@ import org.slf4j.LoggerFactory; import javax.management.ObjectName; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.Map; -import java.util.HashMap; import static org.apache.hadoop.scm.ScmConfigKeys .OZONE_SCM_CLIENT_ADDRESS_KEY; @@ -114,7 +119,7 @@ import static org.apache.hadoop.util.ExitUtil.terminate; @InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"}) public class StorageContainerManager implements StorageContainerDatanodeProtocol, - StorageContainerLocationProtocol, SCMMXBean { + StorageContainerLocationProtocol, ScmBlockLocationProtocol, SCMMXBean{ private static final Logger LOG = LoggerFactory.getLogger(StorageContainerManager.class); @@ -124,6 +129,7 @@ public class StorageContainerManager */ private final NodeManager scmNodeManager; private final Mapping scmContainerManager; + private final BlockManager scmBlockManager; /** The RPC server that listens to requests from DataNodes. */ private final RPC.Server datanodeRpcServer; @@ -153,6 +159,8 @@ public class StorageContainerManager // TODO : Fix the ClusterID generation code. scmNodeManager = new SCMNodeManager(conf, UUID.randomUUID().toString()); scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize); + scmBlockManager = new BlockManagerImpl(conf, scmNodeManager, + scmContainerManager, cacheSize); RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class, ProtobufRpcEngine.class); @@ -176,7 +184,7 @@ public class StorageContainerManager .StorageContainerLocationProtocolService .newReflectiveBlockingService( new StorageContainerLocationProtocolServerSideTranslatorPB( - this)); + this, this)); final InetSocketAddress scmAddress = OzoneClientUtils.getScmClientBindAddress(conf); @@ -431,6 +439,7 @@ public class StorageContainerManager datanodeRpcServer.stop(); unregisterMXBean(); IOUtils.closeQuietly(scmContainerManager); + IOUtils.closeQuietly(scmBlockManager); } /** @@ -540,4 +549,36 @@ public class StorageContainerManager return scmNodeManager; } + /** + * Get block locations. + * @param keys batch of block keys to retrieve. + * @return set of allocated blocks. + * @throws IOException + */ + @Override + public Set getBlockLocations(final Set keys) + throws IOException { + Set locatedBlocks = new HashSet<>(); + for (String key: keys) { + Pipeline pipeline = scmBlockManager.getBlock(key); + AllocatedBlock block = new AllocatedBlock.Builder() + .setKey(key) + .setPipeline(pipeline).build(); + locatedBlocks.add(block); + } + return locatedBlocks; + } + + /** + * Asks SCM where a block should be allocated. SCM responds with the set + * of datanodes and leader that should be used creating this block. + * + * @param size - size of the block. + * @return - allocated block accessing info (key, pipeline and leader). + * @throws IOException + */ + @Override + public AllocatedBlock allocateBlock(final long size) throws IOException { + return scmBlockManager.allocateBlock(size); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java new file mode 100644 index 00000000000..d1487fc7382 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.scm.block; + +import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; + +import java.io.Closeable; +import java.io.IOException; + +/** + * + * Block APIs. + * Container is transparent to these APIs. + */ +public interface BlockManager extends Closeable { + /** + * Allocates a new block for a given size. + * @param size - size of the block to be allocated + * @return - the allocated pipeline and key for the block + * @throws IOException + */ + AllocatedBlock allocateBlock(long size) throws IOException; + + /** + * Give the key to the block, get the pipeline info. + * @param key - key to the block. + * @return - Pipeline that used to access the block. + * @throws IOException + */ + Pipeline getBlock(String key) throws IOException; + + /** + * Given a key of the block, delete the block. + * @param key - key of the block. + * @throws IOException + */ + void deleteBlock(String key) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java new file mode 100644 index 00000000000..e0d2d089abf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.scm.block; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.scm.container.Mapping; +import org.apache.hadoop.ozone.scm.exceptions.SCMException; +import org.apache.hadoop.ozone.scm.node.NodeManager; +import org.apache.hadoop.scm.ScmConfigKeys; +import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.utils.LevelDBStore; +import org.iq80.leveldb.DBIterator; +import org.iq80.leveldb.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +import java.util.ArrayList; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.UUID; + +import static org.apache.hadoop.ozone.scm.exceptions.SCMException + .ResultCodes.CHILL_MODE_EXCEPTION; +import static org.apache.hadoop.ozone.scm.exceptions.SCMException + .ResultCodes.FAILED_TO_ALLOCATE_CONTAINER; +import static org.apache.hadoop.ozone.scm.exceptions.SCMException + .ResultCodes.FAILED_TO_FIND_CONTAINER; +import static org.apache.hadoop.ozone.scm.exceptions.SCMException + .ResultCodes.FAILED_TO_FIND_CONTAINER_WITH_SAPCE; +import static org.apache.hadoop.ozone.scm.exceptions.SCMException + .ResultCodes.FAILED_TO_LOAD_OPEN_CONTAINER; +import static org.apache.hadoop.ozone.scm.exceptions.SCMException + .ResultCodes.INVALID_BLOCK_SIZE; + +/** + * Block Manager manages the block access for SCM. + */ +public class BlockManagerImpl implements BlockManager { + private static final Logger LOG = + LoggerFactory.getLogger(BlockManagerImpl.class); + + private final NodeManager nodeManager; + private final Mapping containerManager; + private final LevelDBStore blockStore; + + private final Lock lock; + private final long containerSize; + private final long cacheSize; + + private final LevelDBStore openContainerStore; + private Map openContainers; + private final int containerProvisionBatchSize; + private final Random rand; + + /** + * Constructor. + * @param conf - configuration. + * @param nodeManager - node manager. + * @param containerManager - container manager. + * @param cacheSizeMB - cache size for level db store. + * @throws IOException + */ + public BlockManagerImpl(final Configuration conf, + final NodeManager nodeManager, final Mapping containerManager, + final int cacheSizeMB) throws IOException { + this.nodeManager = nodeManager; + this.containerManager = containerManager; + this.cacheSize = cacheSizeMB; + String scmMetaDataDir = conf.get(OzoneConfigKeys + .OZONE_CONTAINER_METADATA_DIRS); + if ((scmMetaDataDir == null) || (scmMetaDataDir.isEmpty())) { + throw + new IllegalArgumentException("SCM metadata directory is not valid."); + } + Options options = new Options(); + options.cacheSize(this.cacheSize * OzoneConsts.MB); + options.createIfMissing(); + + // Write the block key to container name mapping. + File blockContainerDbPath = new File(scmMetaDataDir, "block.db"); + blockStore = new LevelDBStore(blockContainerDbPath, options); + + this.containerSize = OzoneConsts.GB * conf.getInt( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT); + + // Load store of all open contains for block allocation + File openContainsDbPath = new File(scmMetaDataDir, "openContainers.db"); + openContainerStore = new LevelDBStore(openContainsDbPath, options); + openContainers = new HashMap<>(); + loadOpenContainers(); + + this.containerProvisionBatchSize = conf.getInt( + ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT); + rand = new Random(); + this.lock = new ReentrantLock(); + } + + // TODO: close full (or almost full) containers with a separate thread. + /** + * Load open containers from persistent store. + * @throws IOException + */ + private void loadOpenContainers() throws IOException { + try (DBIterator iter = openContainerStore.getIterator()) { + for (iter.seekToFirst(); iter.hasNext(); iter.next()) { + try { + byte[] key = iter.peekNext().getKey(); + String containerName = DFSUtil.bytes2String(key); + byte[] value = iter.peekNext().getValue(); + Long containerUsed = Long.parseLong(DFSUtil.bytes2String(value)); + openContainers.put(containerName, containerUsed); + LOG.debug("Loading open container: {} used : {}", containerName, + containerUsed); + } catch (Exception ex) { + LOG.warn("Failed loading open container, continue next..."); + } + } + } catch (IOException e) { + LOG.error("Loading open container store failed." + e); + throw new SCMException("Failed to load open container store", + FAILED_TO_LOAD_OPEN_CONTAINER); + } + } + + /** + * Pre-provision specified count of containers for block creation. + * @param count - number of containers to create. + * @return list of container names created. + * @throws IOException + */ + private List provisionContainers(int count) throws IOException { + List results = new ArrayList(); + lock.lock(); + try { + for (int i = 0; i < count; i++) { + String containerName = UUID.randomUUID().toString(); + try { + Pipeline pipeline = containerManager.allocateContainer(containerName); + if (pipeline == null) { + LOG.warn("Unable to allocate container."); + continue; + } + } catch (IOException ex) { + LOG.warn("Unable to allocate container: " + ex); + continue; + } + openContainers.put(containerName, 0L); + openContainerStore.put(DFSUtil.string2Bytes(containerName), + DFSUtil.string2Bytes(Long.toString(0L))); + results.add(containerName); + } + } finally { + lock.unlock(); + } + return results; + } + + /** + * Allocates a new block for a given size. + * + * SCM choose one of the open containers and returns that as the location for + * the new block. An open container is a container that is actively written to + * via replicated log. + * @param size - size of the block to be allocated + * @return - the allocated pipeline and key for the block + * @throws IOException + */ + @Override + public AllocatedBlock allocateBlock(final long size) throws IOException { + boolean createContainer = false; + Pipeline pipeline; + if (size < 0 || size > containerSize) { + throw new SCMException("Unsupported block size", + INVALID_BLOCK_SIZE); + } + if (!nodeManager.isOutOfNodeChillMode()) { + throw new SCMException("Unable to create block while in chill mode", + CHILL_MODE_EXCEPTION); + } + + lock.lock(); + try { + List candidates; + if (openContainers.size() == 0) { + try { + candidates = provisionContainers(containerProvisionBatchSize); + } catch (IOException ex) { + throw new SCMException("Unable to allocate container for the block", + FAILED_TO_ALLOCATE_CONTAINER); + } + } else { + candidates = openContainers.entrySet().parallelStream() + .filter(e -> (e.getValue() + size < containerSize)) + .map(e -> e.getKey()) + .collect(Collectors.toList()); + } + + if (candidates.size() == 0) { + try { + candidates = provisionContainers(containerProvisionBatchSize); + } catch (IOException ex) { + throw new SCMException("Unable to allocate container for the block", + FAILED_TO_ALLOCATE_CONTAINER); + } + } + + if (candidates.size() == 0) { + throw new SCMException("Fail to find any container to allocate block " + + "of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE); + } + + int randomIdx = rand.nextInt(candidates.size()); + String containerName = candidates.get(randomIdx); + if (LOG.isDebugEnabled()) { + LOG.debug("Find {} candidates: {}, picking: {}", candidates.size(), + candidates.toString(), containerName); + } + + pipeline = containerManager.getContainer(containerName); + if (pipeline == null) { + LOG.debug("Unable to find container for the block"); + throw new SCMException("Unable to find container to allocate block", + FAILED_TO_FIND_CONTAINER); + } + + // TODO: make block key easier to debug (e.g., seq no) + // Allocate key for the block + String blockKey = UUID.randomUUID().toString(); + AllocatedBlock.Builder abb = new AllocatedBlock.Builder() + .setKey(blockKey).setPipeline(pipeline) + .setShouldCreateContainer(createContainer); + if (pipeline.getMachines().size() > 0) { + blockStore.put(DFSUtil.string2Bytes(blockKey), + DFSUtil.string2Bytes(containerName)); + + // update the container usage information + Long newUsed = openContainers.get(containerName) + size; + openContainers.put(containerName, newUsed); + openContainerStore.put(DFSUtil.string2Bytes(containerName), + DFSUtil.string2Bytes(Long.toString(newUsed))); + return abb.build(); + } + } finally { + lock.unlock(); + } + return null; + } + + /** + * + * Given a block key, return the Pipeline information. + * @param key - block key assigned by SCM. + * @return Pipeline (list of DNs and leader) to access the block. + * @throws IOException + */ + @Override + public Pipeline getBlock(final String key) throws IOException { + lock.lock(); + try { + byte[] containerBytes = blockStore.get(DFSUtil.string2Bytes(key)); + if (containerBytes == null) { + throw new IOException("Specified block key does not exist. key : " + + key); + } + return containerManager.getContainer( + DFSUtil.bytes2String(containerBytes)); + } finally { + lock.unlock(); + } + } + + /** + * Given a block key, delete a block. + * @param key - block key assigned by SCM. + * @throws IOException + */ + @Override + public void deleteBlock(final String key) throws IOException { + throw new NotImplementedException("deleteBlock is not supported"); + } + + /** + * Close the resources for BlockManager. + * @throws IOException + */ + @Override + public void close() throws IOException { + if (blockStore != null) { + blockStore.close(); + } + if (openContainerStore != null) { + openContainerStore.close(); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/package-info.java new file mode 100644 index 00000000000..3c566277611 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.scm.block; +/** + * This package contains routines to manage the block location and + * mapping inside SCM + */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java index 53f37b5a003..a6d029383c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java @@ -224,6 +224,10 @@ public class ContainerMapping implements Mapping { pipeline = newPipelineFromNodes(datanodes, containerName); containerStore.put(containerName.getBytes(encoding), pipeline.getProtobufMessage().toByteArray()); + } else { + LOG.debug("Unable to find enough datanodes for new container. " + + "Required {} found {}", replicationFactor, + datanodes != null ? datanodes.size(): 0); } } finally { lock.unlock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java index c716d290e59..fd4ee26567e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java @@ -100,6 +100,12 @@ public class SCMException extends IOException { FAILED_TO_FIND_HEALTHY_NODES, FAILED_TO_FIND_NODES_WITH_SPACE, FAILED_TO_FIND_SUITABLE_NODE, - INVALID_CAPACITY + INVALID_CAPACITY, + INVALID_BLOCK_SIZE, + CHILL_MODE_EXCEPTION, + FAILED_TO_LOAD_OPEN_CONTAINER, + FAILED_TO_ALLOCATE_CONTAINER, + FAILED_TO_FIND_CONTAINER, + FAILED_TO_FIND_CONTAINER_WITH_SAPCE, } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestBlockManager.java new file mode 100644 index 00000000000..3194cfdb36f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestBlockManager.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.scm.block; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.ozone.scm.container.ContainerMapping; +import org.apache.hadoop.ozone.scm.container.MockNodeManager; +import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Paths; +import java.util.UUID; + +import static org.apache.hadoop.ozone.OzoneConsts.GB; +import static org.apache.hadoop.ozone.OzoneConsts.MB; + + +/** + * Tests for SCM Block Manager. + */ +public class TestBlockManager { + private static ContainerMapping mapping; + private static MockNodeManager nodeManager; + private static BlockManager blockManager; + private static File testDir; + private final static long DEFAULT_BLOCK_SIZE = 128 * MB; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = SCMTestUtils.getConf(); + + URL p = conf.getClass().getResource(""); + String path = p.getPath().concat( + org.apache.hadoop.ozone.scm.container.TestContainerMapping + .class.getSimpleName()); + + conf.set(OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS, path); + testDir = Paths.get(path).toFile(); + boolean folderExisted = testDir.exists() || testDir.mkdirs(); + if (!folderExisted) { + throw new IOException("Unable to create test diectory path"); + } + nodeManager = new MockNodeManager(true, 10); + mapping = new ContainerMapping(conf, nodeManager, 128); + blockManager = new BlockManagerImpl(conf, nodeManager, mapping, 128); + } + + @AfterClass + public static void cleanup() throws IOException { + blockManager.close(); + mapping.close(); + FileUtil.fullyDelete(testDir); + } + + @Before + public void clearChillMode() { + nodeManager.setChillmode(false); + } + + @Test + public void testAllocateBlock() throws Exception { + AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE); + Assert.assertNotNull(block); + } + + @Test + public void testGetAllocatedBlock() throws IOException { + AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE); + Assert.assertNotNull(block); + Pipeline pipeline = blockManager.getBlock(block.getKey()); + Assert.assertEquals(pipeline.getLeader().getDatanodeUuid(), + block.getPipeline().getLeader().getDatanodeUuid()); + } + + @Test + public void testAllocateOversidedBlock() throws IOException { + long size = 6 * GB; + thrown.expectMessage("Unsupported block size"); + AllocatedBlock block = blockManager.allocateBlock(size); + } + + @Test + public void testGetNoneExistentContainer() throws IOException { + String nonExistBlockKey = UUID.randomUUID().toString(); + thrown.expectMessage("Specified block key does not exist."); + blockManager.getBlock(nonExistBlockKey); + } + + @Test + public void testChillModeAllocateBlockFails() throws IOException { + nodeManager.setChillmode(true); + thrown.expectMessage("Unable to create block while in chill mode"); + blockManager.allocateBlock(DEFAULT_BLOCK_SIZE); + } +}