HDFS-11859. Ozone: SCM: Separate BlockLocationProtocol from ContainerLocationProtocol. Contributed by Xiaoyu Yao.

This commit is contained in:
Xiaoyu Yao 2017-05-23 10:56:23 -07:00 committed by Owen O'Malley
parent 0753e094d7
commit 47c4867765
13 changed files with 624 additions and 195 deletions

View File

@ -53,10 +53,17 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_CLIENT_PORT_KEY =
"ozone.scm.client.port";
public static final int OZONE_SCM_CLIENT_PORT_DEFAULT = 9860;
public static final String OZONE_SCM_DATANODE_PORT_KEY =
"ozone.scm.datanode.port";
public static final int OZONE_SCM_DATANODE_PORT_DEFAULT = 9861;
// OZONE_KSM_PORT_DEFAULT = 9862
public static final String OZONE_SCM_BLOCK_CLIENT_PORT_KEY =
"ozone.scm.block.client.port";
public static final int OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT = 9863;
// Container service client
public static final String OZONE_SCM_CLIENT_ADDRESS_KEY =
"ozone.scm.client.address";
public static final String OZONE_SCM_CLIENT_BIND_HOST_KEY =
@ -64,6 +71,14 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_CLIENT_BIND_HOST_DEFAULT =
"0.0.0.0";
// Block service client
public static final String OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY =
"ozone.scm.block.client.address";
public static final String OZONE_SCM_BLOCK_CLIENT_BIND_HOST_KEY =
"ozone.scm.block.client.bind.host";
public static final String OZONE_SCM_BLOCK_CLIENT_BIND_HOST_DEFAULT =
"0.0.0.0";
public static final String OZONE_SCM_DATANODE_ADDRESS_KEY =
"ozone.scm.datanode.address";
public static final String OZONE_SCM_DATANODE_BIND_HOST_KEY =

View File

@ -19,7 +19,7 @@ package org.apache.hadoop.scm.container.common.helpers;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.DeleteScmBlockResult;
.ScmBlockLocationProtocolProtos.DeleteScmBlockResult;
/**
* Class wraps storage container manager block deletion results.

View File

@ -60,6 +60,6 @@ public interface ScmBlockLocationProtocol {
* @throws IOException if there is any failure.
*
*/
List<DeleteBlockResult> deleteBlocks(Set<String> keys);
List<DeleteBlockResult> deleteBlocks(Set<String> keys) throws IOException;
}

View File

@ -0,0 +1,186 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.scm.protocolPB;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.AllocateScmBlockRequestProto;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.AllocateScmBlockResponseProto;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.DeleteScmBlocksRequestProto;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.DeleteScmBlocksResponseProto;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.DeleteScmBlockResult;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.GetScmBlockLocationsRequestProto;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.GetScmBlockLocationsResponseProto;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.ScmLocatedBlockProto;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/**
* This class is the client-side translator to translate the requests made on
* the {@link ScmBlockLocationProtocol} interface to the RPC server
* implementing {@link ScmBlockLocationProtocolPB}.
*/
@InterfaceAudience.Private
public final class ScmBlockLocationProtocolClientSideTranslatorPB
implements ScmBlockLocationProtocol, ProtocolTranslator, Closeable {
/**
* RpcController is not used and hence is set to null.
*/
private static final RpcController NULL_RPC_CONTROLLER = null;
private final ScmBlockLocationProtocolPB rpcProxy;
/**
* Creates a new StorageContainerLocationProtocolClientSideTranslatorPB.
*
* @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy
*/
public ScmBlockLocationProtocolClientSideTranslatorPB(
ScmBlockLocationProtocolPB rpcProxy) {
this.rpcProxy = rpcProxy;
}
/**
* Find the set of nodes to read/write a block, as
* identified by the block key. This method supports batch lookup by
* passing multiple keys.
*
* @param keys batch of block keys to find
* @return allocated blocks for each block key
* @throws IOException if there is any failure
*/
@Override
public Set<AllocatedBlock> getBlockLocations(Set<String> keys)
throws IOException {
GetScmBlockLocationsRequestProto.Builder req =
GetScmBlockLocationsRequestProto.newBuilder();
for (String key : keys) {
req.addKeys(key);
}
final GetScmBlockLocationsResponseProto resp;
try {
resp = rpcProxy.getScmBlockLocations(NULL_RPC_CONTROLLER,
req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
Set<AllocatedBlock> locatedBlocks =
Sets.newLinkedHashSetWithExpectedSize(resp.getLocatedBlocksCount());
for (ScmLocatedBlockProto locatedBlock : resp.getLocatedBlocksList()) {
locatedBlocks.add(new AllocatedBlock.Builder()
.setKey(locatedBlock.getKey())
.setPipeline(Pipeline.getFromProtoBuf(locatedBlock.getPipeline()))
.build());
}
return locatedBlocks;
}
/**
* Asks SCM where a block should be allocated. SCM responds with the
* set of datanodes that should be used creating this block.
* @param size - size of the block.
* @return allocated block accessing info (key, pipeline).
* @throws IOException
*/
@Override
public AllocatedBlock allocateBlock(long size) throws IOException {
Preconditions.checkArgument(size > 0,
"block size must be greater than 0");
AllocateScmBlockRequestProto request = AllocateScmBlockRequestProto
.newBuilder().setSize(size).build();
final AllocateScmBlockResponseProto response;
try {
response = rpcProxy.allocateScmBlock(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
if (response.getErrorCode() !=
AllocateScmBlockResponseProto.Error.success) {
throw new IOException(response.hasErrorMessage() ?
response.getErrorMessage() : "Allocate block failed.");
}
AllocatedBlock.Builder builder = new AllocatedBlock.Builder()
.setKey(response.getKey())
.setPipeline(Pipeline.getFromProtoBuf(response.getPipeline()))
.setShouldCreateContainer(response.getCreateContainer());
return builder.build();
}
/**
* Delete the set of keys specified.
*
* @param keys batch of block keys to delete.
* @return list of block deletion results.
* @throws IOException if there is any failure.
*
*/
@Override
public List<DeleteBlockResult> deleteBlocks(Set<String> keys)
throws IOException {
Preconditions.checkArgument(keys == null || keys.isEmpty(),
"keys to be deleted cannot be null or empty");
DeleteScmBlocksRequestProto request = DeleteScmBlocksRequestProto
.newBuilder()
.addAllKeys(keys)
.build();
final DeleteScmBlocksResponseProto resp;
try {
resp = rpcProxy.deleteScmBlocks(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
List<DeleteBlockResult> results = new ArrayList(resp.getResultsCount());
for (DeleteScmBlockResult result : resp.getResultsList()) {
results.add(new DeleteBlockResult(result.getKey(), result.getResult()));
}
return results;
}
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
@Override
public void close() {
RPC.stopProxy(rpcProxy);
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.scm.protocolPB;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.ScmBlockLocationProtocolService;
/**
* Protocol used from an HDFS node to StorageContainerManager. This extends the
* Protocol Buffers service interface to add Hadoop-specific annotations.
*/
@ProtocolInfo(protocolName =
"org.apache.hadoop.ozone.protocol.ScmBlockLocationProtocol",
protocolVersion = 1)
@InterfaceAudience.Private
public interface ScmBlockLocationProtocolPB
extends ScmBlockLocationProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* These .proto interfaces are private and unstable.
* Please see http://wiki.apache.org/hadoop/Compatibility
* for what changes are allowed for a *unstable* .proto interface.
*/
option java_package = "org.apache.hadoop.ozone.protocol.proto";
option java_outer_classname = "ScmBlockLocationProtocolProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.hdfs;
import "hdfs.proto";
import "Ozone.proto";
// SCM Block protocol
/**
* keys - batch of block keys to find
*/
message GetScmBlockLocationsRequestProto {
repeated string keys = 1;
}
/**
* locatedBlocks - for each requested hash, nodes that currently host the
* container for that object key hash
*/
message GetScmBlockLocationsResponseProto {
repeated ScmLocatedBlockProto locatedBlocks = 1;
}
/**
* Holds the nodes that currently host the blocks for a key.
*/
message ScmLocatedBlockProto {
required string key = 1;
required hadoop.hdfs.ozone.Pipeline pipeline = 2;
}
/**
* Request send to SCM asking allocate block of specified size.
*/
message AllocateScmBlockRequestProto {
required uint64 size = 1;
}
/**
* keys - batch of block keys to deleted
*/
message DeleteScmBlocksRequestProto {
repeated string keys = 1;
}
/**
* deletedKeys - keys that are deleted successfully
*/
message DeleteScmBlocksResponseProto {
repeated DeleteScmBlockResult results = 1;
}
message DeleteScmBlockResult {
enum Result {
success = 1;
chillMode = 2;
errorNotFound = 3;
unknownFailure = 4;
}
required Result result = 1;
required string key = 2;
}
/**
* Reply from SCM indicating that the container.
*/
message AllocateScmBlockResponseProto {
enum Error {
success = 1;
errorNotEnoughSpace = 2;
errorSizeTooBig = 3;
unknownFailure = 4;
}
required Error errorCode = 1;
required string key = 2;
required hadoop.hdfs.ozone.Pipeline pipeline = 3;
required bool createContainer = 4;
optional string errorMessage = 5;
}
/**
* Protocol used from KeySpaceManager to StorageContainerManager.
* See request and response messages for details of the RPC calls.
*/
service ScmBlockLocationProtocolService {
/**
* Find the set of nodes that currently host the block, as
* identified by the key. This method supports batch lookup by
* passing multiple keys.
*/
rpc getScmBlockLocations(GetScmBlockLocationsRequestProto)
returns (GetScmBlockLocationsResponseProto);
/**
* Creates a block entry in SCM.
*/
rpc allocateScmBlock(AllocateScmBlockRequestProto)
returns (AllocateScmBlockResponseProto);
/**
* Deletes one or multiple block keys from SCM.
*/
rpc deleteScmBlocks(DeleteScmBlocksRequestProto)
returns (DeleteScmBlocksResponseProto);
}

View File

@ -100,78 +100,6 @@ message DeleteContainerResponseProto {
// Empty response
}
// SCM Block protocol
/**
* keys - batch of block keys to find
*/
message GetScmBlockLocationsRequestProto {
repeated string keys = 1;
}
/**
* locatedBlocks - for each requested hash, nodes that currently host the
* container for that object key hash
*/
message GetScmBlockLocationsResponseProto {
repeated ScmLocatedBlockProto locatedBlocks = 1;
}
/**
* Holds the nodes that currently host the blocks for a key.
*/
message ScmLocatedBlockProto {
required string key = 1;
required hadoop.hdfs.ozone.Pipeline pipeline = 2;
}
/**
* Request send to SCM asking allocate block of specified size.
*/
message AllocateScmBlockRequestProto {
required uint64 size = 1;
}
/**
* keys - batch of block keys to deleted
*/
message DeleteScmBlocksRequestProto {
repeated string keys = 1;
}
/**
* deletedKeys - keys that are deleted successfully
*/
message DeleteScmBlocksResponseProto {
repeated DeleteScmBlockResult results = 1;
}
message DeleteScmBlockResult {
enum Result {
success = 1;
chillMode = 2;
errorNotFound = 3;
unknownFailure = 4;
}
required Result result = 1;
required string key = 2;
}
/**
* Reply from SCM indicating that the container.
*/
message AllocateScmBlockResponseProto {
enum Error {
success = 1;
errorNotEnoughSpace = 2;
errorSizeTooBig = 3;
unknownFailure = 4;
}
required Error errorCode = 1;
required string key = 2;
required hadoop.hdfs.ozone.Pipeline pipeline = 3;
required bool createContainer = 4;
optional string errorMessage = 5;
}
/**
* Protocol used from an HDFS node to StorageContainerManager. See the request
@ -200,24 +128,4 @@ service StorageContainerLocationProtocolService {
* Deletes a container in SCM.
*/
rpc deleteContainer(DeleteContainerRequestProto) returns (DeleteContainerResponseProto);
/**
* Find the set of nodes that currently host the block, as
* identified by the key. This method supports batch lookup by
* passing multiple keys.
*/
rpc getScmBlockLocations(GetScmBlockLocationsRequestProto)
returns (GetScmBlockLocationsResponseProto);
/**
* Creates a block entry in SCM.
*/
rpc allocateScmBlock(AllocateScmBlockRequestProto)
returns (AllocateScmBlockResponseProto);
/**
* Deletes one or multiple block keys from SCM.
*/
rpc deleteScmBlocks(DeleteScmBlocksRequestProto)
returns (DeleteScmBlocksResponseProto);
}

View File

@ -35,6 +35,8 @@ import org.apache.hadoop.ksm.protocolPB
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -69,6 +71,8 @@ public final class ObjectStoreHandler implements Closeable {
keySpaceManagerClient;
private final StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
private final ScmBlockLocationProtocolClientSideTranslatorPB
scmBlockLocationClient;
private final StorageHandler storageHandler;
/**
@ -91,6 +95,7 @@ public final class ObjectStoreHandler implements Closeable {
ProtobufRpcEngine.class);
long scmVersion =
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
InetSocketAddress scmAddress =
OzoneClientUtils.getScmAddressForClients(conf);
this.storageContainerLocationClient =
@ -99,6 +104,16 @@ public final class ObjectStoreHandler implements Closeable {
scmAddress, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
InetSocketAddress scmBlockAddress =
OzoneClientUtils.getScmAddressForBlockClients(conf);
this.scmBlockLocationClient =
new ScmBlockLocationProtocolClientSideTranslatorPB(
RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
scmBlockAddress, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
long ksmVersion =
RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class);
InetSocketAddress ksmAddress = OzoneClientUtils.getKsmAddress(conf);
@ -116,6 +131,7 @@ public final class ObjectStoreHandler implements Closeable {
if (OzoneConsts.OZONE_HANDLER_LOCAL.equalsIgnoreCase(shType)) {
storageHandler = new LocalStorageHandler(conf);
this.storageContainerLocationClient = null;
this.scmBlockLocationClient = null;
this.keySpaceManagerClient = null;
} else {
throw new IllegalArgumentException(
@ -162,5 +178,8 @@ public final class ObjectStoreHandler implements Closeable {
if (this.storageContainerLocationClient != null) {
this.storageContainerLocationClient.close();
}
if (this.scmBlockLocationClient != null) {
this.scmBlockLocationClient.close();
}
}
}

View File

@ -151,6 +151,33 @@ public final class OzoneClientUtils {
port.or(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
}
/**
* Retrieve the socket address that should be used by clients to connect
* to the SCM for block service.
*
* @param conf
* @return Target InetSocketAddress for the SCM block client endpoint.
*/
public static InetSocketAddress getScmAddressForBlockClients(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
if (!host.isPresent()) {
throw new IllegalArgumentException(
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY +
" must be defined. See" +
" https://wiki.apache.org/hadoop/Ozone#Configuration for details" +
" on configuring Ozone.");
}
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
return NetUtils.createSocketAddr(host.get() + ":" +
port.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT));
}
/**
* Retrieve the socket address that should be used by DataNodes to connect
* to the SCM.
@ -207,6 +234,26 @@ public final class OzoneClientUtils {
port.or(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
}
/**
* Retrieve the socket address that should be used by clients to connect
* to the SCM Block service.
*
* @param conf
* @return Target InetSocketAddress for the SCM block client endpoint.
*/
public static InetSocketAddress getScmBlockClientBindAddress(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_KEY);
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_DEFAULT) +
":" + port.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT));
}
/**
* Retrieve the socket address that should be used by DataNodes to connect
* to the SCM.

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocolPB;
import com.google.common.collect.Sets;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.ozone.protocol.proto
.ScmBlockLocationProtocolProtos.AllocateScmBlockRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.ScmBlockLocationProtocolProtos.DeleteScmBlocksRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.ScmBlockLocationProtocolProtos.DeleteScmBlocksResponseProto;
import static org.apache.hadoop.ozone.protocol.proto
.ScmBlockLocationProtocolProtos.DeleteScmBlockResult;
import org.apache.hadoop.ozone.protocol.proto
.ScmBlockLocationProtocolProtos.GetScmBlockLocationsRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.ScmBlockLocationProtocolProtos.GetScmBlockLocationsResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.ScmBlockLocationProtocolProtos.ScmLocatedBlockProto;
import java.io.IOException;
import java.util.List;
import java.util.Set;
/**
* This class is the server-side translator that forwards requests received on
* {@link StorageContainerLocationProtocolPB} to the
* {@link StorageContainerLocationProtocol} server implementation.
*/
@InterfaceAudience.Private
public final class ScmBlockLocationProtocolServerSideTranslatorPB
implements ScmBlockLocationProtocolPB {
private final ScmBlockLocationProtocol impl;
/**
* Creates a new ScmBlockLocationProtocolServerSideTranslatorPB.
*
* @param impl {@link ScmBlockLocationProtocol} server implementation
*/
public ScmBlockLocationProtocolServerSideTranslatorPB(
ScmBlockLocationProtocol impl) throws IOException {
this.impl = impl;
}
@Override
public GetScmBlockLocationsResponseProto getScmBlockLocations(
RpcController controller, GetScmBlockLocationsRequestProto req)
throws ServiceException {
Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
req.getKeysCount());
for (String key : req.getKeysList()) {
keys.add(key);
}
final Set<AllocatedBlock> blocks;
try {
blocks = impl.getBlockLocations(keys);
} catch (IOException ex) {
throw new ServiceException(ex);
}
GetScmBlockLocationsResponseProto.Builder resp =
GetScmBlockLocationsResponseProto.newBuilder();
for (AllocatedBlock block: blocks) {
ScmLocatedBlockProto.Builder locatedBlock =
ScmLocatedBlockProto.newBuilder()
.setKey(block.getKey())
.setPipeline(block.getPipeline().getProtobufMessage());
resp.addLocatedBlocks(locatedBlock.build());
}
return resp.build();
}
@Override
public AllocateScmBlockResponseProto allocateScmBlock(
RpcController controller, AllocateScmBlockRequestProto request)
throws ServiceException {
try {
AllocatedBlock allocatedBlock =
impl.allocateBlock(request.getSize());
if (allocatedBlock != null) {
return
AllocateScmBlockResponseProto.newBuilder()
.setKey(allocatedBlock.getKey())
.setPipeline(allocatedBlock.getPipeline().getProtobufMessage())
.setCreateContainer(allocatedBlock.getCreateContainer())
.setErrorCode(AllocateScmBlockResponseProto.Error.success)
.build();
} else {
return AllocateScmBlockResponseProto.newBuilder()
.setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure)
.build();
}
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public DeleteScmBlocksResponseProto deleteScmBlocks(
RpcController controller, DeleteScmBlocksRequestProto req)
throws ServiceException {
Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
req.getKeysCount());
for (String key : req.getKeysList()) {
keys.add(key);
}
DeleteScmBlocksResponseProto.Builder resp =
DeleteScmBlocksResponseProto.newBuilder();
try {
final List<DeleteBlockResult> results = impl.deleteBlocks(keys);
for (DeleteBlockResult result: results) {
DeleteScmBlockResult.Builder deleteResult = DeleteScmBlockResult
.newBuilder().setKey(result.getKey()).setResult(result.getResult());
resp.addResults(deleteResult.build());
}
} catch (IOException ex) {
throw new ServiceException(ex);
}
return resp.build();
}
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.protocolPB;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import com.google.common.collect.Sets;
@ -28,10 +27,7 @@ import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
import org.apache.hadoop.scm.protocol.LocatedContainer;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos
@ -39,29 +35,12 @@ import org.apache.hadoop.ozone.protocol.proto
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos
.GetStorageContainerLocationsResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos
.ScmLocatedBlockProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.LocatedContainerProto;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.ContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.AllocateScmBlockRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.AllocateScmBlockResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.DeleteScmBlocksRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.DeleteScmBlocksResponseProto;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.DeleteScmBlockResult;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.GetScmBlockLocationsRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.GetScmBlockLocationsResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto
@ -83,7 +62,6 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
implements StorageContainerLocationProtocolPB {
private final StorageContainerLocationProtocol impl;
private final ScmBlockLocationProtocol blockImpl;
/**
* Creates a new StorageContainerLocationProtocolServerSideTranslatorPB.
@ -91,10 +69,8 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
* @param impl {@link StorageContainerLocationProtocol} server implementation
*/
public StorageContainerLocationProtocolServerSideTranslatorPB(
StorageContainerLocationProtocol impl,
ScmBlockLocationProtocol blockImpl) throws IOException {
StorageContainerLocationProtocol impl) throws IOException {
this.impl = impl;
this.blockImpl = blockImpl;
}
@Override
@ -170,77 +146,4 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
throw new ServiceException(e);
}
}
@Override
public GetScmBlockLocationsResponseProto getScmBlockLocations(
RpcController controller, GetScmBlockLocationsRequestProto req)
throws ServiceException {
Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
req.getKeysCount());
for (String key : req.getKeysList()) {
keys.add(key);
}
final Set<AllocatedBlock> blocks;
try {
blocks = blockImpl.getBlockLocations(keys);
} catch (IOException ex) {
throw new ServiceException(ex);
}
GetScmBlockLocationsResponseProto.Builder resp =
GetScmBlockLocationsResponseProto.newBuilder();
for (AllocatedBlock block: blocks) {
ScmLocatedBlockProto.Builder locatedBlock =
ScmLocatedBlockProto.newBuilder()
.setKey(block.getKey())
.setPipeline(block.getPipeline().getProtobufMessage());
resp.addLocatedBlocks(locatedBlock.build());
}
return resp.build();
}
@Override
public AllocateScmBlockResponseProto allocateScmBlock(
RpcController controller, AllocateScmBlockRequestProto request)
throws ServiceException {
try {
AllocatedBlock allocatedBlock =
blockImpl.allocateBlock(request.getSize());
if (allocatedBlock != null) {
return
AllocateScmBlockResponseProto.newBuilder()
.setKey(allocatedBlock.getKey())
.setPipeline(allocatedBlock.getPipeline().getProtobufMessage())
.setCreateContainer(allocatedBlock.getCreateContainer())
.setErrorCode(AllocateScmBlockResponseProto.Error.success)
.build();
} else {
return
AllocateScmBlockResponseProto.newBuilder()
.setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure)
.build();
}
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public DeleteScmBlocksResponseProto deleteScmBlocks(
RpcController controller, DeleteScmBlocksRequestProto req)
throws ServiceException {
Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
req.getKeysCount());
for (String key : req.getKeysList()) {
keys.add(key);
}
final List<DeleteBlockResult> results = blockImpl.deleteBlocks(keys);
DeleteScmBlocksResponseProto.Builder resp =
DeleteScmBlocksResponseProto.newBuilder();
for (DeleteBlockResult result: results) {
DeleteScmBlockResult.Builder deleteResult = DeleteScmBlockResult
.newBuilder().setKey(result.getKey()).setResult(result.getResult());
resp.addResults(deleteResult.build());
}
return resp.build();
}
}

View File

@ -30,6 +30,9 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos;
import org.apache.hadoop.ozone.protocolPB
.ScmBlockLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.scm.block.BlockManager;
import org.apache.hadoop.ozone.scm.block.BlockManagerImpl;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
@ -73,6 +76,7 @@ import org.apache.hadoop.ozone.protocol.proto
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
import org.apache.hadoop.ozone.protocolPB
.StorageContainerDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.ozone.protocolPB
.StorageContainerLocationProtocolServerSideTranslatorPB;
@ -97,8 +101,10 @@ import java.util.Set;
import java.util.UUID;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.DeleteScmBlockResult.Result;
.ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_CLIENT_ADDRESS_KEY;
import static org.apache.hadoop.scm.ScmConfigKeys
@ -146,6 +152,10 @@ public class StorageContainerManager
private final RPC.Server clientRpcServer;
private final InetSocketAddress clientRpcAddress;
/** The RPC server that listens to requests from block service clients. */
private final RPC.Server blockRpcServer;
private final InetSocketAddress blockRpcAddress;
/** SCM mxbean. */
private ObjectName scmInfoBeanName;
@ -173,6 +183,8 @@ public class StorageContainerManager
ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
ProtobufRpcEngine.class);
BlockingService dnProtoPbService = StorageContainerDatanodeProtocolProtos.
StorageContainerDatanodeProtocolService.newReflectiveBlockingService(
@ -186,12 +198,12 @@ public class StorageContainerManager
datanodeRpcAddress = updateListenAddress(conf,
OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer);
// SCM Container Service RPC
BlockingService storageProtoPbService =
StorageContainerLocationProtocolProtos
.StorageContainerLocationProtocolService
.newReflectiveBlockingService(
new StorageContainerLocationProtocolServerSideTranslatorPB(
this, this));
new StorageContainerLocationProtocolServerSideTranslatorPB(this));
final InetSocketAddress scmAddress =
OzoneClientUtils.getScmClientBindAddress(conf);
@ -201,6 +213,22 @@ public class StorageContainerManager
clientRpcAddress = updateListenAddress(conf,
OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer);
// SCM Block Service RPC
BlockingService blockProtoPbService =
ScmBlockLocationProtocolProtos
.ScmBlockLocationProtocolService
.newReflectiveBlockingService(
new ScmBlockLocationProtocolServerSideTranslatorPB(this));
final InetSocketAddress scmBlockAddress =
OzoneClientUtils.getScmBlockClientBindAddress(conf);
blockRpcServer = startRpcServer(conf, scmBlockAddress,
ScmBlockLocationProtocolPB.class, blockProtoPbService,
handlerCount);
blockRpcAddress = updateListenAddress(conf,
OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress, blockRpcServer);
registerMXBean();
}
@ -447,6 +475,9 @@ public class StorageContainerManager
LOG.info(buildRpcServerStartMessage(
"StorageContainerLocationProtocol RPC server", clientRpcAddress));
clientRpcServer.start();
LOG.info(buildRpcServerStartMessage(
"ScmBlockLocationProtocol RPC server", blockRpcAddress));
blockRpcServer.start();
LOG.info(buildRpcServerStartMessage("RPC server for DataNodes",
datanodeRpcAddress));
datanodeRpcServer.start();
@ -456,6 +487,8 @@ public class StorageContainerManager
* Stop service.
*/
public void stop() {
LOG.info("Stopping block service RPC server");
blockRpcServer.stop();
LOG.info("Stopping the StorageContainerLocationProtocol RPC server");
clientRpcServer.stop();
LOG.info("Stopping the RPC server for DataNodes");
@ -470,6 +503,7 @@ public class StorageContainerManager
*/
public void join() {
try {
blockRpcServer.join();
clientRpcServer.join();
datanodeRpcServer.join();
} catch (InterruptedException e) {

View File

@ -351,6 +351,7 @@ public final class MiniOzoneCluster extends MiniDFSCluster
configScmMetadata();
conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
conf.set(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY, "127.0.0.1:0");