HDFS-11504. Ozone: SCM: Add Block APIs. Contributed by Xiaoyu Yao.

This commit is contained in:
Xiaoyu Yao 2017-04-19 22:34:49 -07:00 committed by Owen O'Malley
parent 720e5c40f6
commit a9ba8b5ded
13 changed files with 948 additions and 7 deletions

View File

@ -147,6 +147,10 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY = public static final String OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY =
"ozone.scm.container.placement.impl"; "ozone.scm.container.placement.impl";
public static final String OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE =
"ozone.scm.container.provision_batch_size";
public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 10;
/** /**
* Never constructed. * Never constructed.
*/ */

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.scm.container.common.helpers;
/**
* Allocated block wraps the result returned from SCM#allocateBlock which
* contains a Pipeline and the key.
*/
public final class AllocatedBlock {
private Pipeline pipeline;
private String key;
// Indicates whether the client should create container before writing block.
private boolean shouldCreateContainer;
/**
* Builder for AllocatedBlock.
*/
public static class Builder {
private Pipeline pipeline;
private String key;
private boolean shouldCreateContainer;
public Builder setPipeline(Pipeline p) {
this.pipeline = p;
return this;
}
public Builder setKey(String k) {
this.key = k;
return this;
}
public Builder setShouldCreateContainer(boolean shouldCreate) {
this.shouldCreateContainer = shouldCreate;
return this;
}
public AllocatedBlock build() {
return new AllocatedBlock(pipeline, key, shouldCreateContainer);
}
}
private AllocatedBlock(Pipeline pipeline, String key,
boolean shouldCreateContainer) {
this.pipeline = pipeline;
this.key = key;
this.shouldCreateContainer = shouldCreateContainer;
}
public Pipeline getPipeline() {
return pipeline;
}
public String getKey() {
return key;
}
public boolean getCreateContainer() {
return shouldCreateContainer;
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.scm.protocol;
import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
/**
* ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes
* to read/write a block.
*/
public interface ScmBlockLocationProtocol {
/**
* Find the set of nodes to read/write a block, as
* identified by the block key. This method supports batch lookup by
* passing multiple keys.
*
* @param keys batch of block keys to find
* @return allocated blocks for each block key
* @throws IOException if there is any failure
*/
Set<AllocatedBlock> getBlockLocations(Set<String> keys)
throws IOException;
/**
* Asks SCM where a block should be allocated. SCM responds with the
* set of datanodes that should be used creating this block.
* @param size - size of the block.
* @return allocated block accessing info (key, pipeline).
* @throws IOException
*/
AllocatedBlock allocateBlock(long size) throws IOException;
}

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.scm.protocol;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
/**
* Holds the nodes that currently host the block for a block key.
*/
@InterfaceAudience.Private
public final class ScmLocatedBlock {
private final String key;
private final List<DatanodeInfo> locations;
private final DatanodeInfo leader;
/**
* Creates a ScmLocatedBlock.
*
* @param key object key
* @param locations nodes that currently host the block
* @param leader node that currently acts as pipeline leader
*/
public ScmLocatedBlock(final String key, final List<DatanodeInfo> locations,
final DatanodeInfo leader) {
this.key = key;
this.locations = locations;
this.leader = leader;
}
/**
* Returns the object key.
*
* @return object key
*/
public String getKey() {
return this.key;
}
/**
* Returns the node that currently acts as pipeline leader.
*
* @return node that currently acts as pipeline leader
*/
public DatanodeInfo getLeader() {
return this.leader;
}
/**
* Returns the nodes that currently host the block.
*
* @return List<DatanodeInfo> nodes that currently host the block
*/
public List<DatanodeInfo> getLocations() {
return this.locations;
}
@Override
public boolean equals(Object otherObj) {
if (otherObj == null) {
return false;
}
if (!(otherObj instanceof ScmLocatedBlock)) {
return false;
}
ScmLocatedBlock other = (ScmLocatedBlock)otherObj;
return this.key == null ? other.key == null : this.key.equals(other.key);
}
@Override
public int hashCode() {
return key.hashCode();
}
@Override
public String toString() {
return getClass().getSimpleName() + "{key=" + key + "; locations="
+ locations.stream().map(loc -> loc.toString()).collect(Collectors
.joining(",")) + "; leader=" + leader + "}";
}
}

View File

@ -84,6 +84,54 @@ message ContainerResponseProto {
optional string errorMessage = 3; optional string errorMessage = 3;
} }
// SCM Block protocol
/**
* keys - batch of block keys to find
*/
message GetScmBlockLocationsRequestProto {
repeated string keys = 1;
}
/**
* locatedBlocks - for each requested hash, nodes that currently host the
* container for that object key hash
*/
message GetScmBlockLocationsResponseProto {
repeated ScmLocatedBlockProto locatedBlocks = 1;
}
/**
* Holds the nodes that currently host the blocks for a key.
*/
message ScmLocatedBlockProto {
required string key = 1;
required hadoop.hdfs.ozone.Pipeline pipeline = 2;
}
/**
* Request send to SCM asking allocate block of specified size.
*/
message AllocateScmBlockRequestProto {
required uint64 size = 1;
}
/**
* Reply from SCM indicating that the container.
*/
message AllocateScmBlockResponseProto {
enum Error {
success = 1;
errorNotEnoughSpace = 2;
errorSizeTooBig = 3;
unknownFailure = 4;
}
required Error errorCode = 1;
required string key = 2;
required hadoop.hdfs.ozone.Pipeline pipeline = 3;
required bool createContainer = 4;
optional string errorMessage = 5;
}
/** /**
* Protocol used from an HDFS node to StorageContainerManager. See the request * Protocol used from an HDFS node to StorageContainerManager. See the request
* and response messages for details of the RPC calls. * and response messages for details of the RPC calls.
@ -102,4 +150,18 @@ service StorageContainerLocationProtocolService {
*/ */
rpc allocateContainer(ContainerRequestProto) returns (ContainerResponseProto); rpc allocateContainer(ContainerRequestProto) returns (ContainerResponseProto);
/**
* Find the set of nodes that currently host the block, as
* identified by the key. This method supports batch lookup by
* passing multiple keys.
*/
rpc getScmBlockLocations(GetScmBlockLocationsRequestProto)
returns(GetScmBlockLocationsResponseProto);
/**
Creates a block entry in SCM.
*/
rpc allocateScmBlock(AllocateScmBlockRequestProto)
returns (AllocateScmBlockResponseProto);
} }

View File

@ -27,7 +27,9 @@ import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.protocol.LocatedContainer; import org.apache.hadoop.scm.protocol.LocatedContainer;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos; .StorageContainerLocationProtocolProtos;
@ -37,11 +39,21 @@ import org.apache.hadoop.ozone.protocol.proto
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos .StorageContainerLocationProtocolProtos
.GetStorageContainerLocationsResponseProto; .GetStorageContainerLocationsResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos
.ScmLocatedBlockProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.LocatedContainerProto; .StorageContainerLocationProtocolProtos.LocatedContainerProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.ContainerResponseProto; .StorageContainerLocationProtocolProtos.ContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.AllocateScmBlockRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.AllocateScmBlockResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.GetScmBlockLocationsRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.GetScmBlockLocationsResponseProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
@ -55,6 +67,7 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
implements StorageContainerLocationProtocolPB { implements StorageContainerLocationProtocolPB {
private final StorageContainerLocationProtocol impl; private final StorageContainerLocationProtocol impl;
private final ScmBlockLocationProtocol blockImpl;
/** /**
* Creates a new StorageContainerLocationProtocolServerSideTranslatorPB. * Creates a new StorageContainerLocationProtocolServerSideTranslatorPB.
@ -62,8 +75,10 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
* @param impl {@link StorageContainerLocationProtocol} server implementation * @param impl {@link StorageContainerLocationProtocol} server implementation
*/ */
public StorageContainerLocationProtocolServerSideTranslatorPB( public StorageContainerLocationProtocolServerSideTranslatorPB(
StorageContainerLocationProtocol impl) { StorageContainerLocationProtocol impl,
ScmBlockLocationProtocol blockImpl) throws IOException {
this.impl = impl; this.impl = impl;
this.blockImpl = blockImpl;
} }
@Override @Override
@ -114,4 +129,57 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
throw new ServiceException(e); throw new ServiceException(e);
} }
} }
@Override
public GetScmBlockLocationsResponseProto getScmBlockLocations(
RpcController controller, GetScmBlockLocationsRequestProto req)
throws ServiceException {
Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
req.getKeysCount());
for (String key : req.getKeysList()) {
keys.add(key);
}
final Set<AllocatedBlock> blocks;
try {
blocks = blockImpl.getBlockLocations(keys);
} catch (IOException ex) {
throw new ServiceException(ex);
}
GetScmBlockLocationsResponseProto.Builder resp =
GetScmBlockLocationsResponseProto.newBuilder();
for (AllocatedBlock block: blocks) {
ScmLocatedBlockProto.Builder locatedBlock =
ScmLocatedBlockProto.newBuilder()
.setKey(block.getKey())
.setPipeline(block.getPipeline().getProtobufMessage());
resp.addLocatedBlocks(locatedBlock.build());
}
return resp.build();
}
@Override
public AllocateScmBlockResponseProto allocateScmBlock(
RpcController controller, AllocateScmBlockRequestProto request)
throws ServiceException {
try {
AllocatedBlock allocatedBlock =
blockImpl.allocateBlock(request.getSize());
if (allocatedBlock != null) {
return StorageContainerLocationProtocolProtos
.AllocateScmBlockResponseProto.newBuilder()
.setKey(allocatedBlock.getKey())
.setPipeline(allocatedBlock.getPipeline().getProtobufMessage())
.setCreateContainer(allocatedBlock.getCreateContainer())
.setErrorCode(AllocateScmBlockResponseProto.Error.success)
.build();
} else {
return StorageContainerLocationProtocolProtos
.AllocateScmBlockResponseProto.newBuilder()
.setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure)
.build();
}
} catch (IOException e) {
throw new ServiceException(e);
}
}
} }

View File

@ -30,7 +30,11 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.scm.block.BlockManager;
import org.apache.hadoop.ozone.scm.block.BlockManagerImpl;
import org.apache.hadoop.scm.client.ScmClient; import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.scm.protocol.LocatedContainer; import org.apache.hadoop.scm.protocol.LocatedContainer;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
@ -80,12 +84,13 @@ import org.slf4j.LoggerFactory;
import javax.management.ObjectName; import javax.management.ObjectName;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.Map;
import java.util.HashMap;
import static org.apache.hadoop.scm.ScmConfigKeys import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_CLIENT_ADDRESS_KEY; .OZONE_SCM_CLIENT_ADDRESS_KEY;
@ -114,7 +119,7 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"}) @InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
public class StorageContainerManager public class StorageContainerManager
implements StorageContainerDatanodeProtocol, implements StorageContainerDatanodeProtocol,
StorageContainerLocationProtocol, SCMMXBean { StorageContainerLocationProtocol, ScmBlockLocationProtocol, SCMMXBean{
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(StorageContainerManager.class); LoggerFactory.getLogger(StorageContainerManager.class);
@ -124,6 +129,7 @@ public class StorageContainerManager
*/ */
private final NodeManager scmNodeManager; private final NodeManager scmNodeManager;
private final Mapping scmContainerManager; private final Mapping scmContainerManager;
private final BlockManager scmBlockManager;
/** The RPC server that listens to requests from DataNodes. */ /** The RPC server that listens to requests from DataNodes. */
private final RPC.Server datanodeRpcServer; private final RPC.Server datanodeRpcServer;
@ -153,6 +159,8 @@ public class StorageContainerManager
// TODO : Fix the ClusterID generation code. // TODO : Fix the ClusterID generation code.
scmNodeManager = new SCMNodeManager(conf, UUID.randomUUID().toString()); scmNodeManager = new SCMNodeManager(conf, UUID.randomUUID().toString());
scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize); scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize);
scmBlockManager = new BlockManagerImpl(conf, scmNodeManager,
scmContainerManager, cacheSize);
RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class, RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
ProtobufRpcEngine.class); ProtobufRpcEngine.class);
@ -176,7 +184,7 @@ public class StorageContainerManager
.StorageContainerLocationProtocolService .StorageContainerLocationProtocolService
.newReflectiveBlockingService( .newReflectiveBlockingService(
new StorageContainerLocationProtocolServerSideTranslatorPB( new StorageContainerLocationProtocolServerSideTranslatorPB(
this)); this, this));
final InetSocketAddress scmAddress = final InetSocketAddress scmAddress =
OzoneClientUtils.getScmClientBindAddress(conf); OzoneClientUtils.getScmClientBindAddress(conf);
@ -431,6 +439,7 @@ public class StorageContainerManager
datanodeRpcServer.stop(); datanodeRpcServer.stop();
unregisterMXBean(); unregisterMXBean();
IOUtils.closeQuietly(scmContainerManager); IOUtils.closeQuietly(scmContainerManager);
IOUtils.closeQuietly(scmBlockManager);
} }
/** /**
@ -540,4 +549,36 @@ public class StorageContainerManager
return scmNodeManager; return scmNodeManager;
} }
/**
* Get block locations.
* @param keys batch of block keys to retrieve.
* @return set of allocated blocks.
* @throws IOException
*/
@Override
public Set<AllocatedBlock> getBlockLocations(final Set<String> keys)
throws IOException {
Set<AllocatedBlock> locatedBlocks = new HashSet<>();
for (String key: keys) {
Pipeline pipeline = scmBlockManager.getBlock(key);
AllocatedBlock block = new AllocatedBlock.Builder()
.setKey(key)
.setPipeline(pipeline).build();
locatedBlocks.add(block);
}
return locatedBlocks;
}
/**
* Asks SCM where a block should be allocated. SCM responds with the set
* of datanodes and leader that should be used creating this block.
*
* @param size - size of the block.
* @return - allocated block accessing info (key, pipeline and leader).
* @throws IOException
*/
@Override
public AllocatedBlock allocateBlock(final long size) throws IOException {
return scmBlockManager.allocateBlock(size);
}
} }

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.block;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.Closeable;
import java.io.IOException;
/**
*
* Block APIs.
* Container is transparent to these APIs.
*/
public interface BlockManager extends Closeable {
/**
* Allocates a new block for a given size.
* @param size - size of the block to be allocated
* @return - the allocated pipeline and key for the block
* @throws IOException
*/
AllocatedBlock allocateBlock(long size) throws IOException;
/**
* Give the key to the block, get the pipeline info.
* @param key - key to the block.
* @return - Pipeline that used to access the block.
* @throws IOException
*/
Pipeline getBlock(String key) throws IOException;
/**
* Given a key of the block, delete the block.
* @param key - key of the block.
* @throws IOException
*/
void deleteBlock(String key) throws IOException;
}

View File

@ -0,0 +1,325 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.block;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.scm.container.Mapping;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.utils.LevelDBStore;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.UUID;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
.ResultCodes.CHILL_MODE_EXCEPTION;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
.ResultCodes.FAILED_TO_ALLOCATE_CONTAINER;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
.ResultCodes.FAILED_TO_FIND_CONTAINER;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
.ResultCodes.FAILED_TO_FIND_CONTAINER_WITH_SAPCE;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
.ResultCodes.FAILED_TO_LOAD_OPEN_CONTAINER;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
.ResultCodes.INVALID_BLOCK_SIZE;
/**
* Block Manager manages the block access for SCM.
*/
public class BlockManagerImpl implements BlockManager {
private static final Logger LOG =
LoggerFactory.getLogger(BlockManagerImpl.class);
private final NodeManager nodeManager;
private final Mapping containerManager;
private final LevelDBStore blockStore;
private final Lock lock;
private final long containerSize;
private final long cacheSize;
private final LevelDBStore openContainerStore;
private Map<String, Long> openContainers;
private final int containerProvisionBatchSize;
private final Random rand;
/**
* Constructor.
* @param conf - configuration.
* @param nodeManager - node manager.
* @param containerManager - container manager.
* @param cacheSizeMB - cache size for level db store.
* @throws IOException
*/
public BlockManagerImpl(final Configuration conf,
final NodeManager nodeManager, final Mapping containerManager,
final int cacheSizeMB) throws IOException {
this.nodeManager = nodeManager;
this.containerManager = containerManager;
this.cacheSize = cacheSizeMB;
String scmMetaDataDir = conf.get(OzoneConfigKeys
.OZONE_CONTAINER_METADATA_DIRS);
if ((scmMetaDataDir == null) || (scmMetaDataDir.isEmpty())) {
throw
new IllegalArgumentException("SCM metadata directory is not valid.");
}
Options options = new Options();
options.cacheSize(this.cacheSize * OzoneConsts.MB);
options.createIfMissing();
// Write the block key to container name mapping.
File blockContainerDbPath = new File(scmMetaDataDir, "block.db");
blockStore = new LevelDBStore(blockContainerDbPath, options);
this.containerSize = OzoneConsts.GB * conf.getInt(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
// Load store of all open contains for block allocation
File openContainsDbPath = new File(scmMetaDataDir, "openContainers.db");
openContainerStore = new LevelDBStore(openContainsDbPath, options);
openContainers = new HashMap<>();
loadOpenContainers();
this.containerProvisionBatchSize = conf.getInt(
ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT);
rand = new Random();
this.lock = new ReentrantLock();
}
// TODO: close full (or almost full) containers with a separate thread.
/**
* Load open containers from persistent store.
* @throws IOException
*/
private void loadOpenContainers() throws IOException {
try (DBIterator iter = openContainerStore.getIterator()) {
for (iter.seekToFirst(); iter.hasNext(); iter.next()) {
try {
byte[] key = iter.peekNext().getKey();
String containerName = DFSUtil.bytes2String(key);
byte[] value = iter.peekNext().getValue();
Long containerUsed = Long.parseLong(DFSUtil.bytes2String(value));
openContainers.put(containerName, containerUsed);
LOG.debug("Loading open container: {} used : {}", containerName,
containerUsed);
} catch (Exception ex) {
LOG.warn("Failed loading open container, continue next...");
}
}
} catch (IOException e) {
LOG.error("Loading open container store failed." + e);
throw new SCMException("Failed to load open container store",
FAILED_TO_LOAD_OPEN_CONTAINER);
}
}
/**
* Pre-provision specified count of containers for block creation.
* @param count - number of containers to create.
* @return list of container names created.
* @throws IOException
*/
private List<String> provisionContainers(int count) throws IOException {
List<String> results = new ArrayList();
lock.lock();
try {
for (int i = 0; i < count; i++) {
String containerName = UUID.randomUUID().toString();
try {
Pipeline pipeline = containerManager.allocateContainer(containerName);
if (pipeline == null) {
LOG.warn("Unable to allocate container.");
continue;
}
} catch (IOException ex) {
LOG.warn("Unable to allocate container: " + ex);
continue;
}
openContainers.put(containerName, 0L);
openContainerStore.put(DFSUtil.string2Bytes(containerName),
DFSUtil.string2Bytes(Long.toString(0L)));
results.add(containerName);
}
} finally {
lock.unlock();
}
return results;
}
/**
* Allocates a new block for a given size.
*
* SCM choose one of the open containers and returns that as the location for
* the new block. An open container is a container that is actively written to
* via replicated log.
* @param size - size of the block to be allocated
* @return - the allocated pipeline and key for the block
* @throws IOException
*/
@Override
public AllocatedBlock allocateBlock(final long size) throws IOException {
boolean createContainer = false;
Pipeline pipeline;
if (size < 0 || size > containerSize) {
throw new SCMException("Unsupported block size",
INVALID_BLOCK_SIZE);
}
if (!nodeManager.isOutOfNodeChillMode()) {
throw new SCMException("Unable to create block while in chill mode",
CHILL_MODE_EXCEPTION);
}
lock.lock();
try {
List<String> candidates;
if (openContainers.size() == 0) {
try {
candidates = provisionContainers(containerProvisionBatchSize);
} catch (IOException ex) {
throw new SCMException("Unable to allocate container for the block",
FAILED_TO_ALLOCATE_CONTAINER);
}
} else {
candidates = openContainers.entrySet().parallelStream()
.filter(e -> (e.getValue() + size < containerSize))
.map(e -> e.getKey())
.collect(Collectors.toList());
}
if (candidates.size() == 0) {
try {
candidates = provisionContainers(containerProvisionBatchSize);
} catch (IOException ex) {
throw new SCMException("Unable to allocate container for the block",
FAILED_TO_ALLOCATE_CONTAINER);
}
}
if (candidates.size() == 0) {
throw new SCMException("Fail to find any container to allocate block " +
"of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE);
}
int randomIdx = rand.nextInt(candidates.size());
String containerName = candidates.get(randomIdx);
if (LOG.isDebugEnabled()) {
LOG.debug("Find {} candidates: {}, picking: {}", candidates.size(),
candidates.toString(), containerName);
}
pipeline = containerManager.getContainer(containerName);
if (pipeline == null) {
LOG.debug("Unable to find container for the block");
throw new SCMException("Unable to find container to allocate block",
FAILED_TO_FIND_CONTAINER);
}
// TODO: make block key easier to debug (e.g., seq no)
// Allocate key for the block
String blockKey = UUID.randomUUID().toString();
AllocatedBlock.Builder abb = new AllocatedBlock.Builder()
.setKey(blockKey).setPipeline(pipeline)
.setShouldCreateContainer(createContainer);
if (pipeline.getMachines().size() > 0) {
blockStore.put(DFSUtil.string2Bytes(blockKey),
DFSUtil.string2Bytes(containerName));
// update the container usage information
Long newUsed = openContainers.get(containerName) + size;
openContainers.put(containerName, newUsed);
openContainerStore.put(DFSUtil.string2Bytes(containerName),
DFSUtil.string2Bytes(Long.toString(newUsed)));
return abb.build();
}
} finally {
lock.unlock();
}
return null;
}
/**
*
* Given a block key, return the Pipeline information.
* @param key - block key assigned by SCM.
* @return Pipeline (list of DNs and leader) to access the block.
* @throws IOException
*/
@Override
public Pipeline getBlock(final String key) throws IOException {
lock.lock();
try {
byte[] containerBytes = blockStore.get(DFSUtil.string2Bytes(key));
if (containerBytes == null) {
throw new IOException("Specified block key does not exist. key : " +
key);
}
return containerManager.getContainer(
DFSUtil.bytes2String(containerBytes));
} finally {
lock.unlock();
}
}
/**
* Given a block key, delete a block.
* @param key - block key assigned by SCM.
* @throws IOException
*/
@Override
public void deleteBlock(final String key) throws IOException {
throw new NotImplementedException("deleteBlock is not supported");
}
/**
* Close the resources for BlockManager.
* @throws IOException
*/
@Override
public void close() throws IOException {
if (blockStore != null) {
blockStore.close();
}
if (openContainerStore != null) {
openContainerStore.close();
}
}
}

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.block;
/**
* This package contains routines to manage the block location and
* mapping inside SCM
*/

View File

@ -224,6 +224,10 @@ public class ContainerMapping implements Mapping {
pipeline = newPipelineFromNodes(datanodes, containerName); pipeline = newPipelineFromNodes(datanodes, containerName);
containerStore.put(containerName.getBytes(encoding), containerStore.put(containerName.getBytes(encoding),
pipeline.getProtobufMessage().toByteArray()); pipeline.getProtobufMessage().toByteArray());
} else {
LOG.debug("Unable to find enough datanodes for new container. " +
"Required {} found {}", replicationFactor,
datanodes != null ? datanodes.size(): 0);
} }
} finally { } finally {
lock.unlock(); lock.unlock();

View File

@ -100,6 +100,12 @@ public class SCMException extends IOException {
FAILED_TO_FIND_HEALTHY_NODES, FAILED_TO_FIND_HEALTHY_NODES,
FAILED_TO_FIND_NODES_WITH_SPACE, FAILED_TO_FIND_NODES_WITH_SPACE,
FAILED_TO_FIND_SUITABLE_NODE, FAILED_TO_FIND_SUITABLE_NODE,
INVALID_CAPACITY INVALID_CAPACITY,
INVALID_BLOCK_SIZE,
CHILL_MODE_EXCEPTION,
FAILED_TO_LOAD_OPEN_CONTAINER,
FAILED_TO_ALLOCATE_CONTAINER,
FAILED_TO_FIND_CONTAINER,
FAILED_TO_FIND_CONTAINER_WITH_SAPCE,
} }
} }

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.block;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.scm.container.ContainerMapping;
import org.apache.hadoop.ozone.scm.container.MockNodeManager;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.UUID;
import static org.apache.hadoop.ozone.OzoneConsts.GB;
import static org.apache.hadoop.ozone.OzoneConsts.MB;
/**
* Tests for SCM Block Manager.
*/
public class TestBlockManager {
private static ContainerMapping mapping;
private static MockNodeManager nodeManager;
private static BlockManager blockManager;
private static File testDir;
private final static long DEFAULT_BLOCK_SIZE = 128 * MB;
@Rule
public ExpectedException thrown = ExpectedException.none();
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = SCMTestUtils.getConf();
URL p = conf.getClass().getResource("");
String path = p.getPath().concat(
org.apache.hadoop.ozone.scm.container.TestContainerMapping
.class.getSimpleName());
conf.set(OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS, path);
testDir = Paths.get(path).toFile();
boolean folderExisted = testDir.exists() || testDir.mkdirs();
if (!folderExisted) {
throw new IOException("Unable to create test diectory path");
}
nodeManager = new MockNodeManager(true, 10);
mapping = new ContainerMapping(conf, nodeManager, 128);
blockManager = new BlockManagerImpl(conf, nodeManager, mapping, 128);
}
@AfterClass
public static void cleanup() throws IOException {
blockManager.close();
mapping.close();
FileUtil.fullyDelete(testDir);
}
@Before
public void clearChillMode() {
nodeManager.setChillmode(false);
}
@Test
public void testAllocateBlock() throws Exception {
AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE);
Assert.assertNotNull(block);
}
@Test
public void testGetAllocatedBlock() throws IOException {
AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE);
Assert.assertNotNull(block);
Pipeline pipeline = blockManager.getBlock(block.getKey());
Assert.assertEquals(pipeline.getLeader().getDatanodeUuid(),
block.getPipeline().getLeader().getDatanodeUuid());
}
@Test
public void testAllocateOversidedBlock() throws IOException {
long size = 6 * GB;
thrown.expectMessage("Unsupported block size");
AllocatedBlock block = blockManager.allocateBlock(size);
}
@Test
public void testGetNoneExistentContainer() throws IOException {
String nonExistBlockKey = UUID.randomUUID().toString();
thrown.expectMessage("Specified block key does not exist.");
blockManager.getBlock(nonExistBlockKey);
}
@Test
public void testChillModeAllocateBlockFails() throws IOException {
nodeManager.setChillmode(true);
thrown.expectMessage("Unable to create block while in chill mode");
blockManager.allocateBlock(DEFAULT_BLOCK_SIZE);
}
}