HDFS-2636. Implement protobuf service for ClientDatanodeProtocol. Contributed by Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1211249 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2011-12-07 01:05:56 +00:00
parent ec87e163d4
commit c17bb83644
11 changed files with 4668 additions and 2 deletions

View File

@ -19,6 +19,8 @@ Trunk (unreleased changes)
HDFS-2629. Implement protobuf service for InterDatanodeProtocol. (suresh)
HDFS-2636. Implement protobuf service for ClientDatanodeProtocol. (suresh)
IMPROVEMENTS
HADOOP-7524 Change RPC to allow multiple protocols including multuple

View File

@ -66,6 +66,11 @@ public class BlockLocalPathInfo implements Writable {
*/
public String getBlockPath() {return localBlockPath;}
/**
* @return the Block
*/
public ExtendedBlock getBlock() { return block;}
/**
* Get the Block metadata file.
* @return Block metadata file.

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.token.TokenInfo;
@KerberosInfo(
serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
@TokenInfo(BlockTokenSelector.class)
@ProtocolInfo(protocolName =
"org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol",
protocolVersion = 1)
@InterfaceAudience.Private
public interface ClientDatanodeProtocolPB extends
ClientDatanodeProtocolService.BlockingInterface, VersionedProtocol {
/**
* This method is defined to get the protocol signature using
* ProtocolSignatureWritable - suffix of 2 to the method name
* avoids conflict.
*/
public ProtocolSignatureWritable getProtocolSignature2(String protocol,
long clientVersion, int clientMethodsHash) throws IOException;
}

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.VersionedProtocol;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* Implementation for protobuf service that forwards requests
* received on {@link ClientDatanodeProtocolPB} to the
* {@link ClientDatanodeProtocol} server implementation.
*/
@InterfaceAudience.Private
public class ClientDatanodeProtocolServerSideTranslatorPB implements
ClientDatanodeProtocolPB {
private final static RefreshNamenodesResponseProto REFRESH_NAMENODE_RESP =
RefreshNamenodesResponseProto.newBuilder().build();
private final static DeleteBlockPoolResponseProto DELETE_BLOCKPOOL_RESP =
DeleteBlockPoolResponseProto.newBuilder().build();
private final ClientDatanodeProtocol impl;
public ClientDatanodeProtocolServerSideTranslatorPB(
ClientDatanodeProtocol impl) {
this.impl = impl;
}
@Override
public GetReplicaVisibleLengthResponseProto getReplicaVisibleLength(
RpcController unused, GetReplicaVisibleLengthRequestProto request)
throws ServiceException {
long len;
try {
len = impl.getReplicaVisibleLength(PBHelper.convert(request.getBlock()));
} catch (IOException e) {
throw new ServiceException(e);
}
return GetReplicaVisibleLengthResponseProto.newBuilder().setLength(len)
.build();
}
@Override
public RefreshNamenodesResponseProto refreshNamenode(
RpcController unused, RefreshNamenodesRequestProto request)
throws ServiceException {
try {
impl.refreshNamenodes();
} catch (IOException e) {
throw new ServiceException(e);
}
return REFRESH_NAMENODE_RESP;
}
@Override
public DeleteBlockPoolResponseProto deleteBlockPool(RpcController unused,
DeleteBlockPoolRequestProto request) throws ServiceException {
try {
impl.deleteBlockPool(request.getBlockPool(), request.getForce());
} catch (IOException e) {
throw new ServiceException(e);
}
return DELETE_BLOCKPOOL_RESP;
}
@Override
public GetBlockLocalPathInfoResponseProto getBlockLocalPathInfo(
RpcController unused, GetBlockLocalPathInfoRequestProto request)
throws ServiceException {
BlockLocalPathInfo resp;
try {
resp = impl.getBlockLocalPathInfo(PBHelper.convert(request.getBlock()), PBHelper.convert(request.getToken()));
} catch (IOException e) {
throw new ServiceException(e);
}
return GetBlockLocalPathInfoResponseProto.newBuilder()
.setBlock(PBHelper.convert(resp.getBlock()))
.setLocalPath(resp.getBlockPath()).setLocalMetaPath(resp.getMetaPath())
.build();
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return RPC.getProtocolVersion(ClientDatanodeProtocolPB.class);
}
/**
* The client side will redirect getProtocolSignature to
* getProtocolSignature2.
*
* However the RPC layer below on the Server side will call getProtocolVersion
* and possibly in the future getProtocolSignature. Hence we still implement
* it even though the end client will never call this method.
*
* @see VersionedProtocol#getProtocolVersion
*/
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
/**
* Don't forward this to the server. The protocol version and signature is
* that of {@link ClientDatanodeProtocol}
*/
if (!protocol.equals(RPC.getProtocolName(ClientDatanodeProtocol.class))) {
throw new IOException("Namenode Serverside implements " +
RPC.getProtocolName(ClientDatanodeProtocol.class) +
". The following requested protocol is unknown: " + protocol);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
RPC.getProtocolVersion(ClientDatanodeProtocolPB.class),
ClientDatanodeProtocolPB.class);
}
@Override
public ProtocolSignatureWritable getProtocolSignature2(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
/**
* Don't forward this to the server. The protocol version and signature is
* that of {@link ClientDatanodeProtocol}
*/
return ProtocolSignatureWritable.convert(
this.getProtocolSignature(protocol, clientVersion, clientMethodsHash));
}
}

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.Token;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* This class is the client side translator to translate the requests made on
* {@link ClientDatanodeProtocol} interfaces to the RPC server implementing
* {@link ClientDatanodeProtocolPB}.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class ClientDatanodeProtocolTranslatorPB implements
ClientDatanodeProtocol, Closeable {
/** RpcController is not used and hence is set to null */
private final static RpcController NULL_CONTROLLER = null;
private final ClientDatanodeProtocolPB rpcProxy;
private final static RefreshNamenodesRequestProto REFRESH_NAMENODES =
RefreshNamenodesRequestProto.newBuilder().build();
public ClientDatanodeProtocolTranslatorPB(InetSocketAddress nameNodeAddr,
Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
rpcProxy = RPC.getProxy(ClientDatanodeProtocolPB.class,
RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), nameNodeAddr,
conf);
}
@Override
public void close() {
RPC.stopProxy(rpcProxy);
}
@Override
public long getProtocolVersion(String protocolName, long clientVersion)
throws IOException {
return rpcProxy.getProtocolVersion(protocolName, clientVersion);
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2(
protocol, clientVersion, clientMethodsHash));
}
@Override
public long getReplicaVisibleLength(ExtendedBlock b) throws IOException {
GetReplicaVisibleLengthRequestProto req = GetReplicaVisibleLengthRequestProto
.newBuilder().setBlock(PBHelper.convert(b)).build();
try {
return rpcProxy.getReplicaVisibleLength(NULL_CONTROLLER, req).getLength();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void refreshNamenodes() throws IOException {
try {
rpcProxy.refreshNamenode(NULL_CONTROLLER, REFRESH_NAMENODES);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void deleteBlockPool(String bpid, boolean force) throws IOException {
DeleteBlockPoolRequestProto req = DeleteBlockPoolRequestProto.newBuilder()
.setBlockPool(bpid).setForce(force).build();
try {
rpcProxy.deleteBlockPool(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
Token<BlockTokenIdentifier> token) throws IOException {
GetBlockLocalPathInfoRequestProto req =
GetBlockLocalPathInfoRequestProto.newBuilder()
.setBlock(PBHelper.convert(block))
.setToken(PBHelper.convert(token)).build();
GetBlockLocalPathInfoResponseProto resp;
try {
resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
return new BlockLocalPathInfo(PBHelper.convert(resp.getBlock()),
resp.getLocalPath(), resp.getLocalMetaPath());
}
}

View File

@ -113,7 +113,7 @@ public class InterDatanodeProtocolServerSideTranslatorPB implements
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
RPC.getProtocolVersion(InterDatanodeProtocolPB.class),
InterDatanodeProtocol.class);
InterDatanodeProtocolPB.class);
}

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import com.google.protobuf.ByteString;
@ -393,6 +394,13 @@ class PBHelper {
.setKind(token.getKind().toString()).setPassword(password)
.setService(token.getService().toString()).build();
}
public static Token<BlockTokenIdentifier> convert(
BlockTokenIdentifierProto blockToken) {
return new Token<BlockTokenIdentifier>(blockToken.getIdentifier()
.toByteArray(), blockToken.getPassword().toByteArray(), new Text(
blockToken.getKind()), new Text(blockToken.getService()));
}
public static ReplicaState convert(ReplicaStateProto state) {
switch (state) {

View File

@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// This file contains protocol buffers that are used throughout HDFS -- i.e.
// by the client, server, and data transfer protocols.
option java_package = "org.apache.hadoop.hdfs.protocol.proto";
option java_outer_classname = "ClientDatanodeProtocolProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
import "hdfs.proto";
/**
* block - block for which visible length is requested
*/
message GetReplicaVisibleLengthRequestProto {
required ExtendedBlockProto block = 1;
}
/**
* length - visible length of the block
*/
message GetReplicaVisibleLengthResponseProto {
required uint64 length = 1;
}
/**
* void request
*/
message RefreshNamenodesRequestProto {
}
/**
* void response
*/
message RefreshNamenodesResponseProto {
}
/**
* blockPool - block pool to be deleted
* force - if false, delete the block pool only if it is empty.
* if true, delete the block pool even if it has blocks.
*/
message DeleteBlockPoolRequestProto {
required string blockPool = 1;
required bool force = 2;
}
/**
* void response
*/
message DeleteBlockPoolResponseProto {
}
/**
* Gets the file information where block and its metadata is stored
* block - block for which path information is being requested
* token - block token
*/
message GetBlockLocalPathInfoRequestProto {
required ExtendedBlockProto block = 1;
required BlockTokenIdentifierProto token = 2;
}
/**
* block - block for which file path information is being returned
* localPath - file path where the block data is stored
* localMetaPath - file path where the block meta data is stored
*/
message GetBlockLocalPathInfoResponseProto {
required ExtendedBlockProto block = 1;
required string localPath = 2;
required string localMetaPath = 3;
}
/**
* Protocol used from client to the Datanode.
* See the request and response for details of rpc call.
*/
service ClientDatanodeProtocolService {
/**
* Returns the visible length of the replica
*/
rpc getReplicaVisibleLength(GetReplicaVisibleLengthRequestProto)
returns(GetReplicaVisibleLengthResponseProto);
/**
* Refresh the list of federated namenodes from updated configuration.
* Adds new namenodes and stops the deleted namenodes.
*/
rpc refreshNamenode(RefreshNamenodesRequestProto)
returns(RefreshNamenodesResponseProto);
/**
* Delete the block pool from the datanode.
*/
rpc deleteBlockPool(DeleteBlockPoolRequestProto)
returns(DeleteBlockPoolResponseProto);
/**
* Retrieves the path names of the block file and metadata file stored on the
* local file system.
*/
rpc getBlockLocalPathInfo(GetBlockLocalPathInfoRequestProto)
returns(GetBlockLocalPathInfoResponseProto);
}

View File

@ -38,7 +38,7 @@ message InitReplicaRecoveryRequestProto {
* Repica recovery information
*/
message InitReplicaRecoveryResponseProto {
required ReplicaStateProto state = 1; // State fo the replica
required ReplicaStateProto state = 1; // State of the replica
required BlockProto block = 2; // block information
}

View File

@ -21,6 +21,7 @@ import static junit.framework.Assert.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.hdfs.protocol.Block;
@ -29,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
@ -43,7 +45,9 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogManifestPro
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
@ -54,6 +58,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.junit.Test;
/**
@ -279,6 +284,11 @@ public class TestPBHelper {
ExtendedBlockProto bProto = PBHelper.convert(b);
ExtendedBlock b1 = PBHelper.convert(bProto);
assertEquals(b, b1);
b.setBlockId(-1);
bProto = PBHelper.convert(b);
b1 = PBHelper.convert(bProto);
assertEquals(b, b1);
}
@Test
@ -302,4 +312,17 @@ public class TestPBHelper {
Text t1 = new Text(s);
assertEquals(t, t1);
}
@Test
public void testBlockTokenIdentifier() {
Token<BlockTokenIdentifier> token = new Token<BlockTokenIdentifier>(
"identifier".getBytes(), "password".getBytes(), new Text("kind"),
new Text("service"));
BlockTokenIdentifierProto tokenProto = PBHelper.convert(token);
Token<BlockTokenIdentifier> token2 = PBHelper.convert(tokenProto);
assertTrue(Arrays.equals(token.getIdentifier(), token2.getIdentifier()));
assertTrue(Arrays.equals(token.getPassword(), token2.getPassword()));
assertEquals(token.getKind(), token2.getKind());
assertEquals(token.getService(), token2.getService());
}
}