HDFS-11504. Ozone: SCM: Add Block APIs. Contributed by Xiaoyu Yao.
This commit is contained in:
parent
0bf416ac40
commit
8ec13a397f
|
@ -147,6 +147,10 @@ public final class ScmConfigKeys {
|
||||||
public static final String OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY =
|
public static final String OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY =
|
||||||
"ozone.scm.container.placement.impl";
|
"ozone.scm.container.placement.impl";
|
||||||
|
|
||||||
|
public static final String OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE =
|
||||||
|
"ozone.scm.container.provision_batch_size";
|
||||||
|
public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 10;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Never constructed.
|
* Never constructed.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.scm.container.common.helpers;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocated block wraps the result returned from SCM#allocateBlock which
|
||||||
|
* contains a Pipeline and the key.
|
||||||
|
*/
|
||||||
|
public final class AllocatedBlock {
|
||||||
|
private Pipeline pipeline;
|
||||||
|
private String key;
|
||||||
|
// Indicates whether the client should create container before writing block.
|
||||||
|
private boolean shouldCreateContainer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builder for AllocatedBlock.
|
||||||
|
*/
|
||||||
|
public static class Builder {
|
||||||
|
private Pipeline pipeline;
|
||||||
|
private String key;
|
||||||
|
private boolean shouldCreateContainer;
|
||||||
|
|
||||||
|
public Builder setPipeline(Pipeline p) {
|
||||||
|
this.pipeline = p;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setKey(String k) {
|
||||||
|
this.key = k;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setShouldCreateContainer(boolean shouldCreate) {
|
||||||
|
this.shouldCreateContainer = shouldCreate;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AllocatedBlock build() {
|
||||||
|
return new AllocatedBlock(pipeline, key, shouldCreateContainer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private AllocatedBlock(Pipeline pipeline, String key,
|
||||||
|
boolean shouldCreateContainer) {
|
||||||
|
this.pipeline = pipeline;
|
||||||
|
this.key = key;
|
||||||
|
this.shouldCreateContainer = shouldCreateContainer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Pipeline getPipeline() {
|
||||||
|
return pipeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getKey() {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getCreateContainer() {
|
||||||
|
return shouldCreateContainer;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,53 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.scm.protocol;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes
|
||||||
|
* to read/write a block.
|
||||||
|
*/
|
||||||
|
public interface ScmBlockLocationProtocol {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find the set of nodes to read/write a block, as
|
||||||
|
* identified by the block key. This method supports batch lookup by
|
||||||
|
* passing multiple keys.
|
||||||
|
*
|
||||||
|
* @param keys batch of block keys to find
|
||||||
|
* @return allocated blocks for each block key
|
||||||
|
* @throws IOException if there is any failure
|
||||||
|
*/
|
||||||
|
Set<AllocatedBlock> getBlockLocations(Set<String> keys)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asks SCM where a block should be allocated. SCM responds with the
|
||||||
|
* set of datanodes that should be used creating this block.
|
||||||
|
* @param size - size of the block.
|
||||||
|
* @return allocated block accessing info (key, pipeline).
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
AllocatedBlock allocateBlock(long size) throws IOException;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,100 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.scm.protocol;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Holds the nodes that currently host the block for a block key.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public final class ScmLocatedBlock {
|
||||||
|
private final String key;
|
||||||
|
private final List<DatanodeInfo> locations;
|
||||||
|
private final DatanodeInfo leader;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a ScmLocatedBlock.
|
||||||
|
*
|
||||||
|
* @param key object key
|
||||||
|
* @param locations nodes that currently host the block
|
||||||
|
* @param leader node that currently acts as pipeline leader
|
||||||
|
*/
|
||||||
|
public ScmLocatedBlock(final String key, final List<DatanodeInfo> locations,
|
||||||
|
final DatanodeInfo leader) {
|
||||||
|
this.key = key;
|
||||||
|
this.locations = locations;
|
||||||
|
this.leader = leader;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the object key.
|
||||||
|
*
|
||||||
|
* @return object key
|
||||||
|
*/
|
||||||
|
public String getKey() {
|
||||||
|
return this.key;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the node that currently acts as pipeline leader.
|
||||||
|
*
|
||||||
|
* @return node that currently acts as pipeline leader
|
||||||
|
*/
|
||||||
|
public DatanodeInfo getLeader() {
|
||||||
|
return this.leader;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the nodes that currently host the block.
|
||||||
|
*
|
||||||
|
* @return List<DatanodeInfo> nodes that currently host the block
|
||||||
|
*/
|
||||||
|
public List<DatanodeInfo> getLocations() {
|
||||||
|
return this.locations;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object otherObj) {
|
||||||
|
if (otherObj == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!(otherObj instanceof ScmLocatedBlock)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ScmLocatedBlock other = (ScmLocatedBlock)otherObj;
|
||||||
|
return this.key == null ? other.key == null : this.key.equals(other.key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return key.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return getClass().getSimpleName() + "{key=" + key + "; locations="
|
||||||
|
+ locations.stream().map(loc -> loc.toString()).collect(Collectors
|
||||||
|
.joining(",")) + "; leader=" + leader + "}";
|
||||||
|
}
|
||||||
|
}
|
|
@ -84,6 +84,54 @@ message ContainerResponseProto {
|
||||||
optional string errorMessage = 3;
|
optional string errorMessage = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SCM Block protocol
|
||||||
|
/**
|
||||||
|
* keys - batch of block keys to find
|
||||||
|
*/
|
||||||
|
message GetScmBlockLocationsRequestProto {
|
||||||
|
repeated string keys = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* locatedBlocks - for each requested hash, nodes that currently host the
|
||||||
|
* container for that object key hash
|
||||||
|
*/
|
||||||
|
message GetScmBlockLocationsResponseProto {
|
||||||
|
repeated ScmLocatedBlockProto locatedBlocks = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Holds the nodes that currently host the blocks for a key.
|
||||||
|
*/
|
||||||
|
message ScmLocatedBlockProto {
|
||||||
|
required string key = 1;
|
||||||
|
required hadoop.hdfs.ozone.Pipeline pipeline = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request send to SCM asking allocate block of specified size.
|
||||||
|
*/
|
||||||
|
message AllocateScmBlockRequestProto {
|
||||||
|
required uint64 size = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reply from SCM indicating that the container.
|
||||||
|
*/
|
||||||
|
message AllocateScmBlockResponseProto {
|
||||||
|
enum Error {
|
||||||
|
success = 1;
|
||||||
|
errorNotEnoughSpace = 2;
|
||||||
|
errorSizeTooBig = 3;
|
||||||
|
unknownFailure = 4;
|
||||||
|
}
|
||||||
|
required Error errorCode = 1;
|
||||||
|
required string key = 2;
|
||||||
|
required hadoop.hdfs.ozone.Pipeline pipeline = 3;
|
||||||
|
required bool createContainer = 4;
|
||||||
|
optional string errorMessage = 5;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Protocol used from an HDFS node to StorageContainerManager. See the request
|
* Protocol used from an HDFS node to StorageContainerManager. See the request
|
||||||
* and response messages for details of the RPC calls.
|
* and response messages for details of the RPC calls.
|
||||||
|
@ -102,4 +150,18 @@ service StorageContainerLocationProtocolService {
|
||||||
*/
|
*/
|
||||||
rpc allocateContainer(ContainerRequestProto) returns (ContainerResponseProto);
|
rpc allocateContainer(ContainerRequestProto) returns (ContainerResponseProto);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find the set of nodes that currently host the block, as
|
||||||
|
* identified by the key. This method supports batch lookup by
|
||||||
|
* passing multiple keys.
|
||||||
|
*/
|
||||||
|
rpc getScmBlockLocations(GetScmBlockLocationsRequestProto)
|
||||||
|
returns(GetScmBlockLocationsResponseProto);
|
||||||
|
|
||||||
|
/**
|
||||||
|
Creates a block entry in SCM.
|
||||||
|
*/
|
||||||
|
rpc allocateScmBlock(AllocateScmBlockRequestProto)
|
||||||
|
returns (AllocateScmBlockResponseProto);
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,9 @@ import com.google.protobuf.ServiceException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||||
import org.apache.hadoop.scm.protocol.LocatedContainer;
|
import org.apache.hadoop.scm.protocol.LocatedContainer;
|
||||||
|
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||||
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
|
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.StorageContainerLocationProtocolProtos;
|
.StorageContainerLocationProtocolProtos;
|
||||||
|
@ -37,11 +39,21 @@ import org.apache.hadoop.ozone.protocol.proto
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.StorageContainerLocationProtocolProtos
|
.StorageContainerLocationProtocolProtos
|
||||||
.GetStorageContainerLocationsResponseProto;
|
.GetStorageContainerLocationsResponseProto;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.StorageContainerLocationProtocolProtos
|
||||||
|
.ScmLocatedBlockProto;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.StorageContainerLocationProtocolProtos.LocatedContainerProto;
|
.StorageContainerLocationProtocolProtos.LocatedContainerProto;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.StorageContainerLocationProtocolProtos.ContainerResponseProto;
|
.StorageContainerLocationProtocolProtos.ContainerResponseProto;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.StorageContainerLocationProtocolProtos.AllocateScmBlockRequestProto;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.StorageContainerLocationProtocolProtos.AllocateScmBlockResponseProto;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.StorageContainerLocationProtocolProtos.GetScmBlockLocationsRequestProto;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.StorageContainerLocationProtocolProtos.GetScmBlockLocationsResponseProto;
|
||||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
|
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
|
||||||
|
|
||||||
|
@ -55,6 +67,7 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
|
||||||
implements StorageContainerLocationProtocolPB {
|
implements StorageContainerLocationProtocolPB {
|
||||||
|
|
||||||
private final StorageContainerLocationProtocol impl;
|
private final StorageContainerLocationProtocol impl;
|
||||||
|
private final ScmBlockLocationProtocol blockImpl;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new StorageContainerLocationProtocolServerSideTranslatorPB.
|
* Creates a new StorageContainerLocationProtocolServerSideTranslatorPB.
|
||||||
|
@ -62,8 +75,10 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
|
||||||
* @param impl {@link StorageContainerLocationProtocol} server implementation
|
* @param impl {@link StorageContainerLocationProtocol} server implementation
|
||||||
*/
|
*/
|
||||||
public StorageContainerLocationProtocolServerSideTranslatorPB(
|
public StorageContainerLocationProtocolServerSideTranslatorPB(
|
||||||
StorageContainerLocationProtocol impl) {
|
StorageContainerLocationProtocol impl,
|
||||||
|
ScmBlockLocationProtocol blockImpl) throws IOException {
|
||||||
this.impl = impl;
|
this.impl = impl;
|
||||||
|
this.blockImpl = blockImpl;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -114,4 +129,57 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetScmBlockLocationsResponseProto getScmBlockLocations(
|
||||||
|
RpcController controller, GetScmBlockLocationsRequestProto req)
|
||||||
|
throws ServiceException {
|
||||||
|
Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
|
||||||
|
req.getKeysCount());
|
||||||
|
for (String key : req.getKeysList()) {
|
||||||
|
keys.add(key);
|
||||||
|
}
|
||||||
|
final Set<AllocatedBlock> blocks;
|
||||||
|
try {
|
||||||
|
blocks = blockImpl.getBlockLocations(keys);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new ServiceException(ex);
|
||||||
|
}
|
||||||
|
GetScmBlockLocationsResponseProto.Builder resp =
|
||||||
|
GetScmBlockLocationsResponseProto.newBuilder();
|
||||||
|
for (AllocatedBlock block: blocks) {
|
||||||
|
ScmLocatedBlockProto.Builder locatedBlock =
|
||||||
|
ScmLocatedBlockProto.newBuilder()
|
||||||
|
.setKey(block.getKey())
|
||||||
|
.setPipeline(block.getPipeline().getProtobufMessage());
|
||||||
|
resp.addLocatedBlocks(locatedBlock.build());
|
||||||
|
}
|
||||||
|
return resp.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AllocateScmBlockResponseProto allocateScmBlock(
|
||||||
|
RpcController controller, AllocateScmBlockRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
try {
|
||||||
|
AllocatedBlock allocatedBlock =
|
||||||
|
blockImpl.allocateBlock(request.getSize());
|
||||||
|
if (allocatedBlock != null) {
|
||||||
|
return StorageContainerLocationProtocolProtos
|
||||||
|
.AllocateScmBlockResponseProto.newBuilder()
|
||||||
|
.setKey(allocatedBlock.getKey())
|
||||||
|
.setPipeline(allocatedBlock.getPipeline().getProtobufMessage())
|
||||||
|
.setCreateContainer(allocatedBlock.getCreateContainer())
|
||||||
|
.setErrorCode(AllocateScmBlockResponseProto.Error.success)
|
||||||
|
.build();
|
||||||
|
} else {
|
||||||
|
return StorageContainerLocationProtocolProtos
|
||||||
|
.AllocateScmBlockResponseProto.newBuilder()
|
||||||
|
.setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,11 @@ import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.metrics2.util.MBeans;
|
import org.apache.hadoop.metrics2.util.MBeans;
|
||||||
import org.apache.hadoop.ozone.OzoneClientUtils;
|
import org.apache.hadoop.ozone.OzoneClientUtils;
|
||||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.ozone.scm.block.BlockManager;
|
||||||
|
import org.apache.hadoop.ozone.scm.block.BlockManagerImpl;
|
||||||
import org.apache.hadoop.scm.client.ScmClient;
|
import org.apache.hadoop.scm.client.ScmClient;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||||
|
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||||
import org.apache.hadoop.scm.protocol.LocatedContainer;
|
import org.apache.hadoop.scm.protocol.LocatedContainer;
|
||||||
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
||||||
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
|
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
|
||||||
|
@ -80,12 +84,13 @@ import org.slf4j.LoggerFactory;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.HashMap;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.scm.ScmConfigKeys
|
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||||
.OZONE_SCM_CLIENT_ADDRESS_KEY;
|
.OZONE_SCM_CLIENT_ADDRESS_KEY;
|
||||||
|
@ -114,7 +119,7 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||||
@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
|
@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
|
||||||
public class StorageContainerManager
|
public class StorageContainerManager
|
||||||
implements StorageContainerDatanodeProtocol,
|
implements StorageContainerDatanodeProtocol,
|
||||||
StorageContainerLocationProtocol, SCMMXBean {
|
StorageContainerLocationProtocol, ScmBlockLocationProtocol, SCMMXBean{
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(StorageContainerManager.class);
|
LoggerFactory.getLogger(StorageContainerManager.class);
|
||||||
|
@ -124,6 +129,7 @@ public class StorageContainerManager
|
||||||
*/
|
*/
|
||||||
private final NodeManager scmNodeManager;
|
private final NodeManager scmNodeManager;
|
||||||
private final Mapping scmContainerManager;
|
private final Mapping scmContainerManager;
|
||||||
|
private final BlockManager scmBlockManager;
|
||||||
|
|
||||||
/** The RPC server that listens to requests from DataNodes. */
|
/** The RPC server that listens to requests from DataNodes. */
|
||||||
private final RPC.Server datanodeRpcServer;
|
private final RPC.Server datanodeRpcServer;
|
||||||
|
@ -153,6 +159,8 @@ public class StorageContainerManager
|
||||||
// TODO : Fix the ClusterID generation code.
|
// TODO : Fix the ClusterID generation code.
|
||||||
scmNodeManager = new SCMNodeManager(conf, UUID.randomUUID().toString());
|
scmNodeManager = new SCMNodeManager(conf, UUID.randomUUID().toString());
|
||||||
scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize);
|
scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize);
|
||||||
|
scmBlockManager = new BlockManagerImpl(conf, scmNodeManager,
|
||||||
|
scmContainerManager, cacheSize);
|
||||||
|
|
||||||
RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
|
RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
|
||||||
ProtobufRpcEngine.class);
|
ProtobufRpcEngine.class);
|
||||||
|
@ -176,7 +184,7 @@ public class StorageContainerManager
|
||||||
.StorageContainerLocationProtocolService
|
.StorageContainerLocationProtocolService
|
||||||
.newReflectiveBlockingService(
|
.newReflectiveBlockingService(
|
||||||
new StorageContainerLocationProtocolServerSideTranslatorPB(
|
new StorageContainerLocationProtocolServerSideTranslatorPB(
|
||||||
this));
|
this, this));
|
||||||
|
|
||||||
final InetSocketAddress scmAddress =
|
final InetSocketAddress scmAddress =
|
||||||
OzoneClientUtils.getScmClientBindAddress(conf);
|
OzoneClientUtils.getScmClientBindAddress(conf);
|
||||||
|
@ -431,6 +439,7 @@ public class StorageContainerManager
|
||||||
datanodeRpcServer.stop();
|
datanodeRpcServer.stop();
|
||||||
unregisterMXBean();
|
unregisterMXBean();
|
||||||
IOUtils.closeQuietly(scmContainerManager);
|
IOUtils.closeQuietly(scmContainerManager);
|
||||||
|
IOUtils.closeQuietly(scmBlockManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -540,4 +549,36 @@ public class StorageContainerManager
|
||||||
return scmNodeManager;
|
return scmNodeManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get block locations.
|
||||||
|
* @param keys batch of block keys to retrieve.
|
||||||
|
* @return set of allocated blocks.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Set<AllocatedBlock> getBlockLocations(final Set<String> keys)
|
||||||
|
throws IOException {
|
||||||
|
Set<AllocatedBlock> locatedBlocks = new HashSet<>();
|
||||||
|
for (String key: keys) {
|
||||||
|
Pipeline pipeline = scmBlockManager.getBlock(key);
|
||||||
|
AllocatedBlock block = new AllocatedBlock.Builder()
|
||||||
|
.setKey(key)
|
||||||
|
.setPipeline(pipeline).build();
|
||||||
|
locatedBlocks.add(block);
|
||||||
|
}
|
||||||
|
return locatedBlocks;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asks SCM where a block should be allocated. SCM responds with the set
|
||||||
|
* of datanodes and leader that should be used creating this block.
|
||||||
|
*
|
||||||
|
* @param size - size of the block.
|
||||||
|
* @return - allocated block accessing info (key, pipeline and leader).
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public AllocatedBlock allocateBlock(final long size) throws IOException {
|
||||||
|
return scmBlockManager.allocateBlock(size);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,54 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.scm.block;
|
||||||
|
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Block APIs.
|
||||||
|
* Container is transparent to these APIs.
|
||||||
|
*/
|
||||||
|
public interface BlockManager extends Closeable {
|
||||||
|
/**
|
||||||
|
* Allocates a new block for a given size.
|
||||||
|
* @param size - size of the block to be allocated
|
||||||
|
* @return - the allocated pipeline and key for the block
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
AllocatedBlock allocateBlock(long size) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Give the key to the block, get the pipeline info.
|
||||||
|
* @param key - key to the block.
|
||||||
|
* @return - Pipeline that used to access the block.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
Pipeline getBlock(String key) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a key of the block, delete the block.
|
||||||
|
* @param key - key of the block.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void deleteBlock(String key) throws IOException;
|
||||||
|
}
|
|
@ -0,0 +1,325 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.scm.block;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.NotImplementedException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
|
import org.apache.hadoop.ozone.scm.container.Mapping;
|
||||||
|
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
|
||||||
|
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
||||||
|
import org.apache.hadoop.scm.ScmConfigKeys;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
|
import org.apache.hadoop.utils.LevelDBStore;
|
||||||
|
import org.iq80.leveldb.DBIterator;
|
||||||
|
import org.iq80.leveldb.Options;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
|
||||||
|
.ResultCodes.CHILL_MODE_EXCEPTION;
|
||||||
|
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
|
||||||
|
.ResultCodes.FAILED_TO_ALLOCATE_CONTAINER;
|
||||||
|
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
|
||||||
|
.ResultCodes.FAILED_TO_FIND_CONTAINER;
|
||||||
|
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
|
||||||
|
.ResultCodes.FAILED_TO_FIND_CONTAINER_WITH_SAPCE;
|
||||||
|
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
|
||||||
|
.ResultCodes.FAILED_TO_LOAD_OPEN_CONTAINER;
|
||||||
|
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
|
||||||
|
.ResultCodes.INVALID_BLOCK_SIZE;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Block Manager manages the block access for SCM.
|
||||||
|
*/
|
||||||
|
public class BlockManagerImpl implements BlockManager {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(BlockManagerImpl.class);
|
||||||
|
|
||||||
|
private final NodeManager nodeManager;
|
||||||
|
private final Mapping containerManager;
|
||||||
|
private final LevelDBStore blockStore;
|
||||||
|
|
||||||
|
private final Lock lock;
|
||||||
|
private final long containerSize;
|
||||||
|
private final long cacheSize;
|
||||||
|
|
||||||
|
private final LevelDBStore openContainerStore;
|
||||||
|
private Map<String, Long> openContainers;
|
||||||
|
private final int containerProvisionBatchSize;
|
||||||
|
private final Random rand;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
* @param conf - configuration.
|
||||||
|
* @param nodeManager - node manager.
|
||||||
|
* @param containerManager - container manager.
|
||||||
|
* @param cacheSizeMB - cache size for level db store.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public BlockManagerImpl(final Configuration conf,
|
||||||
|
final NodeManager nodeManager, final Mapping containerManager,
|
||||||
|
final int cacheSizeMB) throws IOException {
|
||||||
|
this.nodeManager = nodeManager;
|
||||||
|
this.containerManager = containerManager;
|
||||||
|
this.cacheSize = cacheSizeMB;
|
||||||
|
String scmMetaDataDir = conf.get(OzoneConfigKeys
|
||||||
|
.OZONE_CONTAINER_METADATA_DIRS);
|
||||||
|
if ((scmMetaDataDir == null) || (scmMetaDataDir.isEmpty())) {
|
||||||
|
throw
|
||||||
|
new IllegalArgumentException("SCM metadata directory is not valid.");
|
||||||
|
}
|
||||||
|
Options options = new Options();
|
||||||
|
options.cacheSize(this.cacheSize * OzoneConsts.MB);
|
||||||
|
options.createIfMissing();
|
||||||
|
|
||||||
|
// Write the block key to container name mapping.
|
||||||
|
File blockContainerDbPath = new File(scmMetaDataDir, "block.db");
|
||||||
|
blockStore = new LevelDBStore(blockContainerDbPath, options);
|
||||||
|
|
||||||
|
this.containerSize = OzoneConsts.GB * conf.getInt(
|
||||||
|
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
|
||||||
|
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
|
||||||
|
|
||||||
|
// Load store of all open contains for block allocation
|
||||||
|
File openContainsDbPath = new File(scmMetaDataDir, "openContainers.db");
|
||||||
|
openContainerStore = new LevelDBStore(openContainsDbPath, options);
|
||||||
|
openContainers = new HashMap<>();
|
||||||
|
loadOpenContainers();
|
||||||
|
|
||||||
|
this.containerProvisionBatchSize = conf.getInt(
|
||||||
|
ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
|
||||||
|
ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT);
|
||||||
|
rand = new Random();
|
||||||
|
this.lock = new ReentrantLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: close full (or almost full) containers with a separate thread.
|
||||||
|
/**
|
||||||
|
* Load open containers from persistent store.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void loadOpenContainers() throws IOException {
|
||||||
|
try (DBIterator iter = openContainerStore.getIterator()) {
|
||||||
|
for (iter.seekToFirst(); iter.hasNext(); iter.next()) {
|
||||||
|
try {
|
||||||
|
byte[] key = iter.peekNext().getKey();
|
||||||
|
String containerName = DFSUtil.bytes2String(key);
|
||||||
|
byte[] value = iter.peekNext().getValue();
|
||||||
|
Long containerUsed = Long.parseLong(DFSUtil.bytes2String(value));
|
||||||
|
openContainers.put(containerName, containerUsed);
|
||||||
|
LOG.debug("Loading open container: {} used : {}", containerName,
|
||||||
|
containerUsed);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.warn("Failed loading open container, continue next...");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Loading open container store failed." + e);
|
||||||
|
throw new SCMException("Failed to load open container store",
|
||||||
|
FAILED_TO_LOAD_OPEN_CONTAINER);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pre-provision specified count of containers for block creation.
|
||||||
|
* @param count - number of containers to create.
|
||||||
|
* @return list of container names created.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private List<String> provisionContainers(int count) throws IOException {
|
||||||
|
List<String> results = new ArrayList();
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
String containerName = UUID.randomUUID().toString();
|
||||||
|
try {
|
||||||
|
Pipeline pipeline = containerManager.allocateContainer(containerName);
|
||||||
|
if (pipeline == null) {
|
||||||
|
LOG.warn("Unable to allocate container.");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} catch (IOException ex) {
|
||||||
|
LOG.warn("Unable to allocate container: " + ex);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
openContainers.put(containerName, 0L);
|
||||||
|
openContainerStore.put(DFSUtil.string2Bytes(containerName),
|
||||||
|
DFSUtil.string2Bytes(Long.toString(0L)));
|
||||||
|
results.add(containerName);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocates a new block for a given size.
|
||||||
|
*
|
||||||
|
* SCM choose one of the open containers and returns that as the location for
|
||||||
|
* the new block. An open container is a container that is actively written to
|
||||||
|
* via replicated log.
|
||||||
|
* @param size - size of the block to be allocated
|
||||||
|
* @return - the allocated pipeline and key for the block
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public AllocatedBlock allocateBlock(final long size) throws IOException {
|
||||||
|
boolean createContainer = false;
|
||||||
|
Pipeline pipeline;
|
||||||
|
if (size < 0 || size > containerSize) {
|
||||||
|
throw new SCMException("Unsupported block size",
|
||||||
|
INVALID_BLOCK_SIZE);
|
||||||
|
}
|
||||||
|
if (!nodeManager.isOutOfNodeChillMode()) {
|
||||||
|
throw new SCMException("Unable to create block while in chill mode",
|
||||||
|
CHILL_MODE_EXCEPTION);
|
||||||
|
}
|
||||||
|
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
List<String> candidates;
|
||||||
|
if (openContainers.size() == 0) {
|
||||||
|
try {
|
||||||
|
candidates = provisionContainers(containerProvisionBatchSize);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new SCMException("Unable to allocate container for the block",
|
||||||
|
FAILED_TO_ALLOCATE_CONTAINER);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
candidates = openContainers.entrySet().parallelStream()
|
||||||
|
.filter(e -> (e.getValue() + size < containerSize))
|
||||||
|
.map(e -> e.getKey())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (candidates.size() == 0) {
|
||||||
|
try {
|
||||||
|
candidates = provisionContainers(containerProvisionBatchSize);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new SCMException("Unable to allocate container for the block",
|
||||||
|
FAILED_TO_ALLOCATE_CONTAINER);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (candidates.size() == 0) {
|
||||||
|
throw new SCMException("Fail to find any container to allocate block " +
|
||||||
|
"of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE);
|
||||||
|
}
|
||||||
|
|
||||||
|
int randomIdx = rand.nextInt(candidates.size());
|
||||||
|
String containerName = candidates.get(randomIdx);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Find {} candidates: {}, picking: {}", candidates.size(),
|
||||||
|
candidates.toString(), containerName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pipeline = containerManager.getContainer(containerName);
|
||||||
|
if (pipeline == null) {
|
||||||
|
LOG.debug("Unable to find container for the block");
|
||||||
|
throw new SCMException("Unable to find container to allocate block",
|
||||||
|
FAILED_TO_FIND_CONTAINER);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: make block key easier to debug (e.g., seq no)
|
||||||
|
// Allocate key for the block
|
||||||
|
String blockKey = UUID.randomUUID().toString();
|
||||||
|
AllocatedBlock.Builder abb = new AllocatedBlock.Builder()
|
||||||
|
.setKey(blockKey).setPipeline(pipeline)
|
||||||
|
.setShouldCreateContainer(createContainer);
|
||||||
|
if (pipeline.getMachines().size() > 0) {
|
||||||
|
blockStore.put(DFSUtil.string2Bytes(blockKey),
|
||||||
|
DFSUtil.string2Bytes(containerName));
|
||||||
|
|
||||||
|
// update the container usage information
|
||||||
|
Long newUsed = openContainers.get(containerName) + size;
|
||||||
|
openContainers.put(containerName, newUsed);
|
||||||
|
openContainerStore.put(DFSUtil.string2Bytes(containerName),
|
||||||
|
DFSUtil.string2Bytes(Long.toString(newUsed)));
|
||||||
|
return abb.build();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Given a block key, return the Pipeline information.
|
||||||
|
* @param key - block key assigned by SCM.
|
||||||
|
* @return Pipeline (list of DNs and leader) to access the block.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Pipeline getBlock(final String key) throws IOException {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
byte[] containerBytes = blockStore.get(DFSUtil.string2Bytes(key));
|
||||||
|
if (containerBytes == null) {
|
||||||
|
throw new IOException("Specified block key does not exist. key : " +
|
||||||
|
key);
|
||||||
|
}
|
||||||
|
return containerManager.getContainer(
|
||||||
|
DFSUtil.bytes2String(containerBytes));
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a block key, delete a block.
|
||||||
|
* @param key - block key assigned by SCM.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void deleteBlock(final String key) throws IOException {
|
||||||
|
throw new NotImplementedException("deleteBlock is not supported");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close the resources for BlockManager.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (blockStore != null) {
|
||||||
|
blockStore.close();
|
||||||
|
}
|
||||||
|
if (openContainerStore != null) {
|
||||||
|
openContainerStore.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.scm.block;
|
||||||
|
/**
|
||||||
|
* This package contains routines to manage the block location and
|
||||||
|
* mapping inside SCM
|
||||||
|
*/
|
|
@ -224,6 +224,10 @@ public class ContainerMapping implements Mapping {
|
||||||
pipeline = newPipelineFromNodes(datanodes, containerName);
|
pipeline = newPipelineFromNodes(datanodes, containerName);
|
||||||
containerStore.put(containerName.getBytes(encoding),
|
containerStore.put(containerName.getBytes(encoding),
|
||||||
pipeline.getProtobufMessage().toByteArray());
|
pipeline.getProtobufMessage().toByteArray());
|
||||||
|
} else {
|
||||||
|
LOG.debug("Unable to find enough datanodes for new container. " +
|
||||||
|
"Required {} found {}", replicationFactor,
|
||||||
|
datanodes != null ? datanodes.size(): 0);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|
|
@ -100,6 +100,12 @@ public class SCMException extends IOException {
|
||||||
FAILED_TO_FIND_HEALTHY_NODES,
|
FAILED_TO_FIND_HEALTHY_NODES,
|
||||||
FAILED_TO_FIND_NODES_WITH_SPACE,
|
FAILED_TO_FIND_NODES_WITH_SPACE,
|
||||||
FAILED_TO_FIND_SUITABLE_NODE,
|
FAILED_TO_FIND_SUITABLE_NODE,
|
||||||
INVALID_CAPACITY
|
INVALID_CAPACITY,
|
||||||
|
INVALID_BLOCK_SIZE,
|
||||||
|
CHILL_MODE_EXCEPTION,
|
||||||
|
FAILED_TO_LOAD_OPEN_CONTAINER,
|
||||||
|
FAILED_TO_ALLOCATE_CONTAINER,
|
||||||
|
FAILED_TO_FIND_CONTAINER,
|
||||||
|
FAILED_TO_FIND_CONTAINER_WITH_SAPCE,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,125 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.scm.block;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
|
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
|
||||||
|
import org.apache.hadoop.ozone.scm.container.ContainerMapping;
|
||||||
|
import org.apache.hadoop.ozone.scm.container.MockNodeManager;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConsts.GB;
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConsts.MB;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for SCM Block Manager.
|
||||||
|
*/
|
||||||
|
public class TestBlockManager {
|
||||||
|
private static ContainerMapping mapping;
|
||||||
|
private static MockNodeManager nodeManager;
|
||||||
|
private static BlockManager blockManager;
|
||||||
|
private static File testDir;
|
||||||
|
private final static long DEFAULT_BLOCK_SIZE = 128 * MB;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public ExpectedException thrown = ExpectedException.none();
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
Configuration conf = SCMTestUtils.getConf();
|
||||||
|
|
||||||
|
URL p = conf.getClass().getResource("");
|
||||||
|
String path = p.getPath().concat(
|
||||||
|
org.apache.hadoop.ozone.scm.container.TestContainerMapping
|
||||||
|
.class.getSimpleName());
|
||||||
|
|
||||||
|
conf.set(OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS, path);
|
||||||
|
testDir = Paths.get(path).toFile();
|
||||||
|
boolean folderExisted = testDir.exists() || testDir.mkdirs();
|
||||||
|
if (!folderExisted) {
|
||||||
|
throw new IOException("Unable to create test diectory path");
|
||||||
|
}
|
||||||
|
nodeManager = new MockNodeManager(true, 10);
|
||||||
|
mapping = new ContainerMapping(conf, nodeManager, 128);
|
||||||
|
blockManager = new BlockManagerImpl(conf, nodeManager, mapping, 128);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void cleanup() throws IOException {
|
||||||
|
blockManager.close();
|
||||||
|
mapping.close();
|
||||||
|
FileUtil.fullyDelete(testDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void clearChillMode() {
|
||||||
|
nodeManager.setChillmode(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAllocateBlock() throws Exception {
|
||||||
|
AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE);
|
||||||
|
Assert.assertNotNull(block);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetAllocatedBlock() throws IOException {
|
||||||
|
AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE);
|
||||||
|
Assert.assertNotNull(block);
|
||||||
|
Pipeline pipeline = blockManager.getBlock(block.getKey());
|
||||||
|
Assert.assertEquals(pipeline.getLeader().getDatanodeUuid(),
|
||||||
|
block.getPipeline().getLeader().getDatanodeUuid());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAllocateOversidedBlock() throws IOException {
|
||||||
|
long size = 6 * GB;
|
||||||
|
thrown.expectMessage("Unsupported block size");
|
||||||
|
AllocatedBlock block = blockManager.allocateBlock(size);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetNoneExistentContainer() throws IOException {
|
||||||
|
String nonExistBlockKey = UUID.randomUUID().toString();
|
||||||
|
thrown.expectMessage("Specified block key does not exist.");
|
||||||
|
blockManager.getBlock(nonExistBlockKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChillModeAllocateBlockFails() throws IOException {
|
||||||
|
nodeManager.setChillmode(true);
|
||||||
|
thrown.expectMessage("Unable to create block while in chill mode");
|
||||||
|
blockManager.allocateBlock(DEFAULT_BLOCK_SIZE);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue