svn merge -c 1212606 from trunk for HDFS-2642.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1230417 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-01-12 06:56:57 +00:00
parent d16e2fdd71
commit 0dac3d8f09
14 changed files with 1056 additions and 56 deletions

View File

@ -23,6 +23,8 @@ Release 0.23-PB - Unreleased
HDFS-2636. Implement protobuf service for ClientDatanodeProtocol. (suresh)
HDFS-2642. Protobuf translators for DatanodeProtocol. (jitendra)
IMPROVEMENTS
HDFS-2018. Move all journal stream management code into one place.

View File

@ -0,0 +1,302 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.hdfs.server.protocolR23Compatible.DatanodeWireProtocol;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* This class is the client side translator to translate the requests made on
* {@link DatanodeProtocolProtocol} interfaces to the RPC server implementing
* {@link DatanodeProtocolProtocolPB}.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class DatanodeProtocolClientSideTranslatorPB implements DatanodeProtocol,
Closeable {
/** RpcController is not used and hence is set to null */
private final static RpcController NULL_CONTROLLER = null;
private final DatanodeProtocolPB rpcProxy;
private static final VersionRequestProto VERSION_REQUEST =
VersionRequestProto.newBuilder().build();
public DatanodeProtocolClientSideTranslatorPB(InetSocketAddress nameNodeAddr,
Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
ProtobufRpcEngine.class);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
rpcProxy = createNamenodeWithRetry(createNamenode(nameNodeAddr, conf, ugi));
}
private static DatanodeProtocolPB createNamenode(
InetSocketAddress nameNodeAddr, Configuration conf,
UserGroupInformation ugi) throws IOException {
return RPC.getProxy(DatanodeProtocolPB.class,
RPC.getProtocolVersion(DatanodeProtocolPB.class), nameNodeAddr, ugi,
conf, NetUtils.getSocketFactory(conf, DatanodeWireProtocol.class));
}
/** Create a {@link NameNode} proxy */
static DatanodeProtocolPB createNamenodeWithRetry(
DatanodeProtocolPB rpcNamenode) {
RetryPolicy createPolicy = RetryPolicies
.retryUpToMaximumCountWithFixedSleep(5,
HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
createPolicy);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
.retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
remoteExceptionToPolicyMap));
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("create", methodPolicy);
return (DatanodeProtocolPB) RetryProxy.create(DatanodeProtocolPB.class,
rpcNamenode, methodNameToPolicyMap);
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return rpcProxy.getProtocolVersion(protocol, clientVersion);
}
@Override
public ProtocolSignature getProtocolSignature(String protocolName,
long clientVersion, int clientMethodsHash) throws IOException {
return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2(
protocolName, clientVersion, clientMethodsHash));
}
@Override
public void close() throws IOException {
RPC.stopProxy(rpcProxy);
}
@Override
public DatanodeRegistration registerDatanode(DatanodeRegistration registration)
throws IOException {
RegisterDatanodeRequestProto req = RegisterDatanodeRequestProto
.newBuilder().setRegistration(PBHelper.convert(registration)).build();
RegisterDatanodeResponseProto resp;
try {
resp = rpcProxy.registerDatanode(NULL_CONTROLLER, req);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
return PBHelper.convert(resp.getRegistration());
}
@Override
public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes)
throws IOException {
HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration)).setCapacity(capacity)
.setCapacity(dfsUsed).setRemaining(remaining)
.setBlockPoolUsed(blockPoolUsed).setXmitsInProgress(xmitsInProgress)
.setXceiverCount(xceiverCount).setFailedVolumes(failedVolumes).build();
HeartbeatResponseProto resp;
try {
resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, req);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
DatanodeCommand[] cmds = new DatanodeCommand[resp.getCmdsList().size()];
int index = 0;
for (DatanodeCommandProto p : resp.getCmdsList()) {
cmds[index] = PBHelper.convert(p);
index++;
}
return cmds;
}
@Override
public DatanodeCommand blockReport(DatanodeRegistration registration,
String poolId, long[] blocks) throws IOException {
BlockReportRequestProto.Builder builder = BlockReportRequestProto
.newBuilder().setRegistration(PBHelper.convert(registration))
.setBlockPoolId(poolId);
if (blocks != null) {
for (int i = 0; i < blocks.length; i++) {
builder.setBlocks(i, blocks[i]);
}
}
BlockReportRequestProto req = builder.build();
BlockReportResponseProto resp;
try {
resp = rpcProxy.blockReport(NULL_CONTROLLER, req);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
return PBHelper.convert(resp.getCmd());
}
@Override
public void blockReceivedAndDeleted(DatanodeRegistration registration,
String poolId, ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks)
throws IOException {
BlockReceivedAndDeletedRequestProto.Builder builder =
BlockReceivedAndDeletedRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setBlockPoolId(poolId);
if (receivedAndDeletedBlocks != null) {
for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
builder.setBlocks(i, PBHelper.convert(receivedAndDeletedBlocks[i]));
}
}
BlockReceivedAndDeletedRequestProto req = builder.build();
try {
rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, req);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
}
@Override
public void errorReport(DatanodeRegistration registration, int errorCode,
String msg) throws IOException {
ErrorReportRequestProto req = ErrorReportRequestProto.newBuilder()
.setRegistartion(PBHelper.convert(registration))
.setErrorCode(errorCode).setMsg(msg).build();
try {
rpcProxy.errorReport(NULL_CONTROLLER, req);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
}
@Override
public NamespaceInfo versionRequest() throws IOException {
try {
return PBHelper.convert(rpcProxy.versionRequest(NULL_CONTROLLER,
VERSION_REQUEST).getInfo());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public UpgradeCommand processUpgradeCommand(UpgradeCommand comm)
throws IOException {
ProcessUpgradeRequestProto req = ProcessUpgradeRequestProto.newBuilder()
.setCmd(PBHelper.convert(comm)).build();
ProcessUpgradeResponseProto resp;
try {
resp = rpcProxy.processUpgrade(NULL_CONTROLLER, req);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
return PBHelper.convert(resp.getCmd());
}
@Override
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
ReportBadBlocksRequestProto.Builder builder = ReportBadBlocksRequestProto
.newBuilder();
for (int i = 0; i < blocks.length; i++) {
builder.addBlocks(i, PBHelper.convert(blocks[i]));
}
ReportBadBlocksRequestProto req = builder.build();
try {
rpcProxy.reportBadBlocks(NULL_CONTROLLER, req);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
}
@Override
public void commitBlockSynchronization(ExtendedBlock block,
long newgenerationstamp, long newlength, boolean closeFile,
boolean deleteblock, DatanodeID[] newtargets) throws IOException {
CommitBlockSynchronizationRequestProto.Builder builder =
CommitBlockSynchronizationRequestProto.newBuilder()
.setBlock(PBHelper.convert(block)).setNewGenStamp(newgenerationstamp)
.setNewLength(newlength).setCloseFile(closeFile)
.setDeleteBlock(deleteblock);
for (int i = 0; i < newtargets.length; i++) {
builder.setNewTaragets(i, PBHelper.convert(newtargets[i]));
}
CommitBlockSynchronizationRequestProto req = builder.build();
try {
rpcProxy.commitBlockSynchronization(NULL_CONTROLLER, req);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.KerberosInfo;
@KerberosInfo(
serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY,
clientPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
@ProtocolInfo(
protocolName = "org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol",
protocolVersion = 1)
@InterfaceAudience.Private
public interface DatanodeProtocolPB extends
DatanodeProtocolService.BlockingInterface, VersionedProtocol {
/**
* This method is defined to get the protocol signature using
* the R23 protocol - hence we have added the suffix of 2 the method name
* to avoid conflict.
*/
public ProtocolSignatureWritable getProtocolSignature2(String protocol,
long clientVersion, int clientMethodsHash) throws IOException;
}

View File

@ -0,0 +1,263 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionResponseProto;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
public class DatanodeProtocolServerSideTranslatorPB implements
DatanodeProtocolPB {
private final DatanodeProtocol impl;
private static final ErrorReportResponseProto ERROR_REPORT_RESPONSE_PROTO =
ErrorReportResponseProto.newBuilder().build();
private static final BlockReceivedAndDeletedResponseProto
BLOCK_RECEIVED_AND_DELETE_RESPONSE =
BlockReceivedAndDeletedResponseProto.newBuilder().build();
private static final ReportBadBlocksResponseProto REPORT_BAD_BLOCK_RESPONSE =
ReportBadBlocksResponseProto.newBuilder().build();
private static final CommitBlockSynchronizationResponseProto
COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO =
CommitBlockSynchronizationResponseProto.newBuilder().build();
public DatanodeProtocolServerSideTranslatorPB(DatanodeProtocol impl) {
this.impl = impl;
}
@Override
public RegisterDatanodeResponseProto registerDatanode(
RpcController controller, RegisterDatanodeRequestProto request)
throws ServiceException {
DatanodeRegistration registration = PBHelper.convert(request
.getRegistration());
DatanodeRegistration registrationResp;
try {
registrationResp = impl.registerDatanode(registration);
} catch (IOException e) {
throw new ServiceException(e);
}
return RegisterDatanodeResponseProto.newBuilder()
.setRegistration(PBHelper.convert(registrationResp)).build();
}
@Override
public HeartbeatResponseProto sendHeartbeat(RpcController controller,
HeartbeatRequestProto request) throws ServiceException {
DatanodeCommand[] cmds;
try {
cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
request.getCapacity(), request.getDfsUsed(), request.getRemaining(),
request.getBlockPoolUsed(), request.getXmitsInProgress(),
request.getXceiverCount(), request.getFailedVolumes());
} catch (IOException e) {
throw new ServiceException(e);
}
HeartbeatResponseProto.Builder builder = HeartbeatResponseProto
.newBuilder();
if (cmds != null) {
for (int i = 0; i < cmds.length; i++) {
builder.addCmds(i, PBHelper.convert(cmds[i]));
}
}
return builder.build();
}
@Override
public BlockReportResponseProto blockReport(RpcController controller,
BlockReportRequestProto request) throws ServiceException {
DatanodeCommand cmd;
List<Long> blockIds = request.getBlocksList();
long[] blocks = new long[blockIds.size()];
for (int i = 0; i < blockIds.size(); i++) {
blocks[i] = blockIds.get(i);
}
try {
cmd = impl.blockReport(PBHelper.convert(request.getRegistration()),
request.getBlockPoolId(), blocks);
} catch (IOException e) {
throw new ServiceException(e);
}
return BlockReportResponseProto.newBuilder().setCmd(PBHelper.convert(cmd))
.build();
}
@Override
public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted(
RpcController controller, BlockReceivedAndDeletedRequestProto request)
throws ServiceException {
List<ReceivedDeletedBlockInfoProto> rdbip = request.getBlocksList();
ReceivedDeletedBlockInfo[] info =
new ReceivedDeletedBlockInfo[rdbip.size()];
for (int i = 0; i < rdbip.size(); i++) {
info[i] = PBHelper.convert(rdbip.get(i));
}
try {
impl.blockReceivedAndDeleted(PBHelper.convert(request.getRegistration()),
request.getBlockPoolId(), info);
} catch (IOException e) {
throw new ServiceException(e);
}
return BLOCK_RECEIVED_AND_DELETE_RESPONSE;
}
@Override
public ErrorReportResponseProto errorReport(RpcController controller,
ErrorReportRequestProto request) throws ServiceException {
try {
impl.errorReport(PBHelper.convert(request.getRegistartion()),
request.getErrorCode(), request.getMsg());
} catch (IOException e) {
throw new ServiceException(e);
}
return ERROR_REPORT_RESPONSE_PROTO;
}
@Override
public VersionResponseProto versionRequest(RpcController controller,
VersionRequestProto request) throws ServiceException {
NamespaceInfo info;
try {
info = impl.versionRequest();
} catch (IOException e) {
throw new ServiceException(e);
}
return VersionResponseProto.newBuilder()
.setInfo(PBHelper.convert(info)).build();
}
@Override
public ProcessUpgradeResponseProto processUpgrade(RpcController controller,
ProcessUpgradeRequestProto request) throws ServiceException {
UpgradeCommand cmd;
try {
cmd = impl.processUpgradeCommand(PBHelper.convert(request.getCmd()));
} catch (IOException e) {
throw new ServiceException(e);
}
return ProcessUpgradeResponseProto.newBuilder()
.setCmd(PBHelper.convert(cmd)).build();
}
@Override
public ReportBadBlocksResponseProto reportBadBlocks(RpcController controller,
ReportBadBlocksRequestProto request) throws ServiceException {
List<LocatedBlockProto> lbps = request.getBlocksList();
LocatedBlock [] blocks = new LocatedBlock [lbps.size()];
for(int i=0; i<lbps.size(); i++) {
blocks[i] = PBHelper.convert(lbps.get(i));
}
try {
impl.reportBadBlocks(blocks);
} catch (IOException e) {
throw new ServiceException(e);
}
return REPORT_BAD_BLOCK_RESPONSE;
}
@Override
public CommitBlockSynchronizationResponseProto commitBlockSynchronization(
RpcController controller, CommitBlockSynchronizationRequestProto request)
throws ServiceException {
List<DatanodeIDProto> dnprotos = request.getNewTaragetsList();
DatanodeID[] dns = new DatanodeID[dnprotos.size()];
for (int i = 0; i < dnprotos.size(); i++) {
dns[i] = PBHelper.convert(dnprotos.get(i));
}
try {
impl.commitBlockSynchronization(PBHelper.convert(request.getBlock()),
request.getNewGenStamp(), request.getNewLength(),
request.getCloseFile(), request.getDeleteBlock(), dns);
} catch (IOException e) {
throw new ServiceException(e);
}
return COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO;
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return RPC.getProtocolVersion(DatanodeProtocolPB.class);
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
/**
* Don't forward this to the server. The protocol version and signature is
* that of {@link DatanodeProtocol}
*/
if (!protocol.equals(RPC.getProtocolName(DatanodeProtocolPB.class))) {
throw new IOException("Namenode Serverside implements " +
RPC.getProtocolName(DatanodeProtocolPB.class) +
". The following requested protocol is unknown: " + protocol);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
RPC.getProtocolVersion(DatanodeProtocolPB.class),
DatanodeProtocolPB.class);
}
@Override
public ProtocolSignatureWritable getProtocolSignature2(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
/**
* Don't forward this to the server. The protocol version and signature is
* that of {@link DatanodeProtocolPB}
*/
return ProtocolSignatureWritable.convert(
this.getProtocolSignature(protocol, clientVersion, clientMethodsHash));
}
}

View File

@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;

View File

@ -20,7 +20,8 @@ package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamespaceInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
@ -39,8 +40,6 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogR
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.VersionResponseProto;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
@ -240,14 +239,6 @@ public class NamenodeProtocolServerSideTranslatorPB implements
throw new ServiceException(e);
}
return VersionResponseProto.newBuilder()
.setInfo(convert(info)).build();
}
private NamespaceInfoProto convert(NamespaceInfo info) {
return NamespaceInfoProto.newBuilder()
.setBlockPoolID(info.getBlockPoolID())
.setBuildVersion(info.getBuildVersion())
.setDistUpgradeVersion(info.getDistributedUpgradeVersion())
.setStorageInfo(PBHelper.convert(info)).build();
.setInfo(PBHelper.convert(info)).build();
}
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto;
@ -41,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransacti
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;

View File

@ -27,6 +27,16 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.UpgradeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
@ -37,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto.AdminState;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfosProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
@ -57,15 +68,26 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
@ -337,10 +359,10 @@ class PBHelper {
}
public static DatanodeInfoProto convert(DatanodeInfo info) {
return DatanodeInfoProto.newBuilder()
.setAdminState(PBHelper.convert(info.getAdminState()))
.setBlockPoolUsed(info.getBlockPoolUsed())
.setCapacity(info.getCapacity())
DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
builder.setBlockPoolUsed(info.getBlockPoolUsed());
builder.setAdminState(PBHelper.convert(info.getAdminState()));
builder.setCapacity(info.getCapacity())
.setDfsUsed(info.getDfsUsed())
.setHostName(info.getHostName())
.setId(PBHelper.convert((DatanodeID)info))
@ -349,6 +371,7 @@ class PBHelper {
.setRemaining(info.getRemaining())
.setXceiverCount(info.getXceiverCount())
.build();
return builder.build();
}
public static AdminStates convert(AdminState adminState) {
@ -378,13 +401,25 @@ class PBHelper {
public static LocatedBlockProto convert(LocatedBlock b) {
Builder builder = LocatedBlockProto.newBuilder();
DatanodeInfo[] locs = b.getLocations();
for(DatanodeInfo loc : locs) {
builder.addLocs(PBHelper.convert(loc));
for (int i = 0; i < locs.length; i++) {
builder.addLocs(i, PBHelper.convert(locs[i]));
}
return builder.setB(PBHelper.convert(b.getBlock()))
.setBlockToken(PBHelper.convert(b.getBlockToken()))
.setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
}
public static LocatedBlock convert(LocatedBlockProto proto) {
List<DatanodeInfoProto> locs = proto.getLocsList();
DatanodeInfo[] targets = new DatanodeInfo[locs.size()];
for (int i = 0; i < locs.size(); i++) {
targets[i] = PBHelper.convert(locs.get(i));
}
LocatedBlock lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets,
proto.getOffset(), proto.getCorrupt());
lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
return lb;
}
public static BlockTokenIdentifierProto convert(
Token<BlockTokenIdentifier> token) {
@ -417,4 +452,245 @@ class PBHelper {
return ReplicaState.FINALIZED;
}
}
public static DatanodeRegistrationProto convert(
DatanodeRegistration registration) {
DatanodeRegistrationProto.Builder builder = DatanodeRegistrationProto
.newBuilder();
return builder.setDatanodeID(PBHelper.convert((DatanodeID) registration))
.setStorageInfo(PBHelper.convert(registration.storageInfo))
.setKeys(PBHelper.convert(registration.exportedKeys)).build();
}
public static DatanodeRegistration convert(DatanodeRegistrationProto proto) {
return new DatanodeRegistration(PBHelper.convert(proto.getDatanodeID()),
PBHelper.convert(proto.getStorageInfo()), PBHelper.convert(proto
.getKeys()));
}
public static DatanodeCommand convert(DatanodeCommandProto proto) {
switch (proto.getCmdType()) {
case BalancerBandwidthCommand:
return PBHelper.convert(proto.getBalancerCmd());
case BlockCommand:
return PBHelper.convert(proto.getBlkCmd());
case BlockRecoveryCommand:
return PBHelper.convert(proto.getRecoveryCmd());
case FinalizeCommand:
return PBHelper.convert(proto.getFinalizeCmd());
case KeyUpdateCommand:
return PBHelper.convert(proto.getKeyUpdateCmd());
case RegisterCommand:
return PBHelper.convert(proto.getRegisterCmd());
case UpgradeCommand:
return PBHelper.convert(proto.getUpgradeCmd());
}
return null;
}
public static BalancerBandwidthCommandProto convert(
BalancerBandwidthCommand bbCmd) {
return BalancerBandwidthCommandProto.newBuilder()
.setBandwidth(bbCmd.getBalancerBandwidthValue()).build();
}
public static KeyUpdateCommandProto convert(KeyUpdateCommand cmd) {
return KeyUpdateCommandProto.newBuilder()
.setKeys(PBHelper.convert(cmd.getExportedKeys())).build();
}
public static BlockRecoveryCommandProto convert(BlockRecoveryCommand cmd) {
BlockRecoveryCommandProto.Builder builder = BlockRecoveryCommandProto
.newBuilder();
for (RecoveringBlock b : cmd.getRecoveringBlocks()) {
builder.addBlocks(PBHelper.convert(b));
}
return builder.build();
}
public static FinalizeCommandProto convert(FinalizeCommand cmd) {
return FinalizeCommandProto.newBuilder()
.setBlockPoolId(cmd.getBlockPoolId()).build();
}
public static RegisterCommandProto convert(RegisterCommand cmd) {
return RegisterCommandProto.newBuilder().build();
}
public static BlockCommandProto convert(BlockCommand cmd) {
BlockCommandProto.Builder builder = BlockCommandProto.newBuilder()
.setBlockPoolId(cmd.getBlockPoolId());
switch (cmd.getAction()) {
case DatanodeProtocol.DNA_TRANSFER:
builder.setAction(BlockCommandProto.Action.TRANSFER);
break;
case DatanodeProtocol.DNA_INVALIDATE:
builder.setAction(BlockCommandProto.Action.INVALIDATE);
break;
}
Block[] blocks = cmd.getBlocks();
for (int i = 0; i < blocks.length; i++) {
builder.addBlocks(PBHelper.convert(blocks[i]));
}
DatanodeInfo[][] infos = cmd.getTargets();
for (int i = 0; i < infos.length; i++) {
builder.addTargets(PBHelper.convert(infos[i]));
}
return builder.build();
}
public static DatanodeInfosProto convert(DatanodeInfo[] datanodeInfos) {
DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder();
for (int i = 0; i < datanodeInfos.length; i++) {
builder.addDatanodes(PBHelper.convert(datanodeInfos[i]));
}
return builder.build();
}
public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) {
DatanodeCommandProto.Builder builder = DatanodeCommandProto.newBuilder();
switch (datanodeCommand.getAction()) {
case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
builder.setCmdType(DatanodeCommandProto.Type.BalancerBandwidthCommand)
.setBalancerCmd(
PBHelper.convert((BalancerBandwidthCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
builder
.setCmdType(DatanodeCommandProto.Type.KeyUpdateCommand)
.setKeyUpdateCmd(PBHelper.convert((KeyUpdateCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_RECOVERBLOCK:
builder.setCmdType(DatanodeCommandProto.Type.BlockRecoveryCommand)
.setRecoveryCmd(
PBHelper.convert((BlockRecoveryCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_FINALIZE:
builder.setCmdType(DatanodeCommandProto.Type.FinalizeCommand)
.setFinalizeCmd(PBHelper.convert((FinalizeCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_REGISTER:
builder.setCmdType(DatanodeCommandProto.Type.RegisterCommand)
.setRegisterCmd(PBHelper.convert((RegisterCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_TRANSFER:
case DatanodeProtocol.DNA_INVALIDATE:
builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd(
PBHelper.convert((BlockCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_SHUTDOWN: //Not expected
case DatanodeProtocol.DNA_UNKNOWN: //Not expected
}
return builder.build();
}
public static UpgradeCommand convert(UpgradeCommandProto upgradeCmd) {
int action = UpgradeCommand.UC_ACTION_UNKNOWN;
switch (upgradeCmd.getAction()) {
case REPORT_STATUS:
action = UpgradeCommand.UC_ACTION_REPORT_STATUS;
break;
case START_UPGRADE:
action = UpgradeCommand.UC_ACTION_START_UPGRADE;
}
return new UpgradeCommand(action, upgradeCmd.getVersion(),
(short) upgradeCmd.getUpgradeStatus());
}
public static RegisterCommand convert(RegisterCommandProto registerCmd) {
return new RegisterCommand();
}
public static KeyUpdateCommand convert(KeyUpdateCommandProto keyUpdateCmd) {
return new KeyUpdateCommand(PBHelper.convert(keyUpdateCmd.getKeys()));
}
public static FinalizeCommand convert(FinalizeCommandProto finalizeCmd) {
return new FinalizeCommand(finalizeCmd.getBlockPoolId());
}
public static BlockRecoveryCommand convert(
BlockRecoveryCommandProto recoveryCmd) {
List<RecoveringBlockProto> list = recoveryCmd.getBlocksList();
List<RecoveringBlock> recoveringBlocks = new ArrayList<RecoveringBlock>(
list.size());
for (int i = 0; i < list.size(); i++) {
recoveringBlocks.add(PBHelper.convert(list.get(0)));
}
return new BlockRecoveryCommand(recoveringBlocks);
}
public static BlockCommand convert(BlockCommandProto blkCmd) {
List<BlockProto> blockProtoList = blkCmd.getBlocksList();
List<DatanodeInfosProto> targetList = blkCmd.getTargetsList();
DatanodeInfo[][] targets = new DatanodeInfo[blockProtoList.size()][];
Block[] blocks = new Block[blockProtoList.size()];
for (int i = 0; i < blockProtoList.size(); i++) {
targets[i] = PBHelper.convert(targetList.get(i));
blocks[i] = PBHelper.convert(blockProtoList.get(i));
}
int action = DatanodeProtocol.DNA_UNKNOWN;
switch (blkCmd.getAction()) {
case TRANSFER:
action = DatanodeProtocol.DNA_TRANSFER;
break;
case INVALIDATE:
action = DatanodeProtocol.DNA_INVALIDATE;
break;
}
return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
}
public static DatanodeInfo[] convert(DatanodeInfosProto datanodeInfosProto) {
List<DatanodeInfoProto> proto = datanodeInfosProto.getDatanodesList();
DatanodeInfo[] infos = new DatanodeInfo[proto.size()];
for (int i = 0; i < infos.length; i++) {
infos[i] = PBHelper.convert(proto.get(i));
}
return infos;
}
public static BalancerBandwidthCommand convert(
BalancerBandwidthCommandProto balancerCmd) {
return new BalancerBandwidthCommand(balancerCmd.getBandwidth());
}
public static ReceivedDeletedBlockInfoProto convert(
ReceivedDeletedBlockInfo receivedDeletedBlockInfo) {
return ReceivedDeletedBlockInfoProto.newBuilder()
.setBlock(PBHelper.convert(receivedDeletedBlockInfo.getBlock()))
.setDeleteHint(receivedDeletedBlockInfo.getDelHints()).build();
}
public static UpgradeCommandProto convert(UpgradeCommand comm) {
UpgradeCommandProto.Builder builder = UpgradeCommandProto.newBuilder()
.setVersion(comm.getVersion())
.setUpgradeStatus(comm.getCurrentStatus());
switch (comm.getAction()) {
case UpgradeCommand.UC_ACTION_REPORT_STATUS:
builder.setAction(UpgradeCommandProto.Action.REPORT_STATUS);
break;
case UpgradeCommand.UC_ACTION_START_UPGRADE:
builder.setAction(UpgradeCommandProto.Action.START_UPGRADE);
break;
default:
builder.setAction(UpgradeCommandProto.Action.UNKNOWN);
break;
}
return builder.build();
}
public static ReceivedDeletedBlockInfo convert(
ReceivedDeletedBlockInfoProto proto) {
return new ReceivedDeletedBlockInfo(PBHelper.convert(proto.getBlock()),
proto.getDeleteHint());
}
public static NamespaceInfoProto convert(NamespaceInfo info) {
return NamespaceInfoProto.newBuilder()
.setBlockPoolID(info.getBlockPoolID())
.setBuildVersion(info.getBuildVersion())
.setDistUpgradeVersion(info.getDistributedUpgradeVersion())
.setStorageInfo(PBHelper.convert((StorageInfo)info)).build();
}
}

View File

@ -66,6 +66,13 @@ implements Writable, NodeRegistration {
this(nodeName, new StorageInfo(), new ExportedBlockKeys());
}
public DatanodeRegistration(DatanodeID dn, StorageInfo info,
ExportedBlockKeys keys) {
super(dn);
this.storageInfo = info;
this.exportedKeys = keys;
}
public DatanodeRegistration(String nodeName, StorageInfo info,
ExportedBlockKeys keys) {
super(nodeName);

View File

@ -40,7 +40,7 @@ import org.apache.hadoop.io.WritableFactory;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class UpgradeCommand extends DatanodeCommand {
final static int UC_ACTION_UNKNOWN = DatanodeProtocol.DNA_UNKNOWN;
public final static int UC_ACTION_UNKNOWN = DatanodeProtocol.DNA_UNKNOWN;
public final static int UC_ACTION_REPORT_STATUS = 100; // report upgrade status
public final static int UC_ACTION_START_UPGRADE = 101; // start upgrade

View File

@ -31,7 +31,7 @@ import "hdfs.proto";
*/
message DatanodeRegistrationProto {
required DatanodeIDProto datanodeID = 1; // Datanode information
required StorageInfoProto storateInfo = 2; // Node information
required StorageInfoProto storageInfo = 2; // Node information
required ExportedBlockKeysProto keys = 3; // Block keys
}
@ -55,7 +55,7 @@ message DatanodeCommandProto {
// cmdType is set
optional BalancerBandwidthCommandProto balancerCmd = 2;
optional BlockCommandProto blkCmd = 3;
optional BlockRecoveryCommndProto recoveryCmd = 4;
optional BlockRecoveryCommandProto recoveryCmd = 4;
optional FinalizeCommandProto finalizeCmd = 5;
optional KeyUpdateCommandProto keyUpdateCmd = 6;
optional RegisterCommandProto registerCmd = 7;
@ -77,22 +77,20 @@ message BalancerBandwidthCommandProto {
* on the given set of blocks.
*/
message BlockCommandProto {
enum Action {
UNKNOWN = 0; // Unknown action
enum Action {
TRANSFER = 1; // Transfer blocks to another datanode
INVALIDATE = 2; // Invalidate blocks
SHUTDOWN = 3; // Shutdown node
}
required uint32 action = 1;
required Action action = 1;
required string blockPoolId = 2;
repeated BlockProto blocks = 3;
repeated DatanodeIDsProto targets = 4;
repeated DatanodeInfosProto targets = 4;
}
/**
* List of blocks to be recovered by the datanode
*/
message BlockRecoveryCommndProto {
message BlockRecoveryCommandProto {
repeated RecoveringBlockProto blocks = 1;
}
@ -126,7 +124,7 @@ message UpgradeCommandProto {
REPORT_STATUS = 100; // Report upgrade status
START_UPGRADE = 101; // Start upgrade
}
required uint32 action = 1; // Upgrade action
required Action action = 1; // Upgrade action
required uint32 version = 2; // Version of the upgrade
required uint32 upgradeStatus = 3; // % completed in range 0 & 100
}
@ -324,6 +322,11 @@ service DatanodeProtocolService {
* Used for debugging.
*/
rpc errorReport(ErrorReportRequestProto) returns(ErrorReportResponseProto);
/**
* Request the version
*/
rpc versionRequest(VersionRequestProto) returns(VersionResponseProto);
/**
* Generic way to send commands from datanode to namenode during

View File

@ -84,19 +84,6 @@ message RollEditLogResponseProto {
required CheckpointSignatureProto signature = 1;
}
/**
* void request
*/
message VersionRequestProto {
}
/**
* void request
*/
message VersionResponseProto {
required NamespaceInfoProto info = 1;
}
/**
* registration - Namenode reporting the error
* errorCode - error code indicating the error

View File

@ -54,10 +54,10 @@ message DatanodeIDProto {
}
/**
* DatanodeID array
* DatanodeInfo array
*/
message DatanodeIDsProto {
repeated DatanodeIDProto datanodes = 1;
message DatanodeInfosProto {
repeated DatanodeInfoProto datanodes = 1;
}
/**
@ -345,3 +345,16 @@ message RecoveringBlockProto {
required LocatedBlockProto block = 2; // Block to be recovered
}
/**
* void request
*/
message VersionRequestProto {
}
/**
* Version response from namenode.
*/
message VersionResponseProto {
required NamespaceInfoProto info = 1;
}

View File

@ -17,29 +17,34 @@
*/
package org.apache.hadoop.hdfs.protocolPB;
import static junit.framework.Assert.*;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamespaceInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RecoveringBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogManifestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto;
@ -47,14 +52,17 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.Text;
@ -120,6 +128,10 @@ public class TestPBHelper {
DatanodeID dn = new DatanodeID("node", "sid", 1, 2);
DatanodeIDProto dnProto = PBHelper.convert(dn);
DatanodeID dn2 = PBHelper.convert(dnProto);
compare(dn, dn2);
}
void compare(DatanodeID dn, DatanodeID dn2) {
assertEquals(dn.getHost(), dn2.getHost());
assertEquals(dn.getInfoPort(), dn2.getInfoPort());
assertEquals(dn.getIpcPort(), dn2.getIpcPort());
@ -177,7 +189,6 @@ public class TestPBHelper {
assertEquals(k1.getExpiryDate(), k2.getExpiryDate());
assertEquals(k1.getKeyId(), k2.getKeyId());
assertTrue(Arrays.equals(k1.getEncodedKey(), k2.getEncodedKey()));
}
@Test
@ -195,7 +206,10 @@ public class TestPBHelper {
getBlockKey(1), keys);
ExportedBlockKeysProto expKeysProto = PBHelper.convert(expKeys);
ExportedBlockKeys expKeys1 = PBHelper.convert(expKeysProto);
compare(expKeys, expKeys1);
}
void compare(ExportedBlockKeys expKeys, ExportedBlockKeys expKeys1) {
BlockKey[] allKeys = expKeys.getAllKeys();
BlockKey[] allKeys1 = expKeys1.getAllKeys();
assertEquals(allKeys.length, allKeys1.length);
@ -314,15 +328,108 @@ public class TestPBHelper {
}
@Test
public void testBlockTokenIdentifier() {
public void testConvertBlockToken() {
Token<BlockTokenIdentifier> token = new Token<BlockTokenIdentifier>(
"identifier".getBytes(), "password".getBytes(), new Text("kind"),
new Text("service"));
BlockTokenIdentifierProto tokenProto = PBHelper.convert(token);
Token<BlockTokenIdentifier> token2 = PBHelper.convert(tokenProto);
assertTrue(Arrays.equals(token.getIdentifier(), token2.getIdentifier()));
assertTrue(Arrays.equals(token.getPassword(), token2.getPassword()));
assertEquals(token.getKind(), token2.getKind());
assertEquals(token.getService(), token2.getService());
compare(token, token2);
}
@Test
public void testConvertNamespaceInfo() {
NamespaceInfo info = new NamespaceInfo(37, "clusterID", "bpID", 2300, 53);
NamespaceInfoProto proto = PBHelper.convert(info);
NamespaceInfo info2 = PBHelper.convert(proto);
compare(info, info2); //Compare the StorageInfo
assertEquals(info.getBlockPoolID(), info2.getBlockPoolID());
assertEquals(info.getBuildVersion(), info2.getBuildVersion());
assertEquals(info.getDistributedUpgradeVersion(),
info2.getDistributedUpgradeVersion());
}
private void compare(StorageInfo expected, StorageInfo actual) {
assertEquals(expected.clusterID, actual.clusterID);
assertEquals(expected.namespaceID, actual.namespaceID);
assertEquals(expected.cTime, actual.cTime);
assertEquals(expected.layoutVersion, actual.layoutVersion);
}
private void compare(Token<BlockTokenIdentifier> expected,
Token<BlockTokenIdentifier> actual) {
assertTrue(Arrays.equals(expected.getIdentifier(), actual.getIdentifier()));
assertTrue(Arrays.equals(expected.getPassword(), actual.getPassword()));
assertEquals(expected.getKind(), actual.getKind());
assertEquals(expected.getService(), actual.getService());
}
@Test
public void testConvertLocatedBlock() {
DatanodeInfo [] dnInfos = new DatanodeInfo[3];
dnInfos[0] = new DatanodeInfo("host0", "0", 5000, 5001, 20000, 10001, 9999,
59, 69, 32, "local", "host0", AdminStates.DECOMMISSION_INPROGRESS);
dnInfos[1] = new DatanodeInfo("host1", "1", 5000, 5001, 20000, 10001, 9999,
59, 69, 32, "local", "host1", AdminStates.DECOMMISSIONED);
dnInfos[2] = new DatanodeInfo("host2", "2", 5000, 5001, 20000, 10001, 9999,
59, 69, 32, "local", "host1", AdminStates.NORMAL);
LocatedBlock lb = new LocatedBlock(
new ExtendedBlock("bp12", 12345, 10, 53), dnInfos, 5, false);
LocatedBlockProto lbProto = PBHelper.convert(lb);
LocatedBlock lb2 = PBHelper.convert(lbProto);
assertEquals(lb.getBlock(), lb2.getBlock());
compare(lb.getBlockToken(), lb2.getBlockToken());
assertEquals(lb.getStartOffset(), lb2.getStartOffset());
assertEquals(lb.isCorrupt(), lb2.isCorrupt());
DatanodeInfo [] dnInfos2 = lb2.getLocations();
assertEquals(dnInfos.length, dnInfos2.length);
for (int i = 0; i < dnInfos.length ; i++) {
compare(dnInfos[i], dnInfos2[i]);
}
}
@Test
public void testConvertDatanodeRegistration() {
DatanodeID dnId = new DatanodeID("host", "xyz", 1, 0);
BlockKey[] keys = new BlockKey[] { getBlockKey(2), getBlockKey(3) };
ExportedBlockKeys expKeys = new ExportedBlockKeys(true, 9, 10,
getBlockKey(1), keys);
DatanodeRegistration reg = new DatanodeRegistration(dnId,
new StorageInfo(), expKeys);
DatanodeRegistrationProto proto = PBHelper.convert(reg);
DatanodeRegistration reg2 = PBHelper.convert(proto);
compare(reg.storageInfo, reg2.storageInfo);
compare(reg.exportedKeys, reg2.exportedKeys);
compare((DatanodeID)reg, (DatanodeID)reg2);
}
@Test
public void testConvertBlockCommand() {
Block[] blocks = new Block[] { new Block(21), new Block(22) };
DatanodeInfo[][] dnInfos = new DatanodeInfo[][] { new DatanodeInfo[1],
new DatanodeInfo[2] };
dnInfos[0][0] = new DatanodeInfo();
dnInfos[1][0] = new DatanodeInfo();
dnInfos[1][1] = new DatanodeInfo();
BlockCommand bc = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, "bp1",
blocks, dnInfos);
BlockCommandProto bcProto = PBHelper.convert(bc);
BlockCommand bc2 = PBHelper.convert(bcProto);
assertEquals(bc.getAction(), bc2.getAction());
assertEquals(bc.getBlocks().length, bc2.getBlocks().length);
Block[] blocks2 = bc2.getBlocks();
for (int i = 0; i < blocks.length; i++) {
assertEquals(blocks[i], blocks2[i]);
}
DatanodeInfo[][] dnInfos2 = bc2.getTargets();
assertEquals(dnInfos.length, dnInfos2.length);
for (int i = 0; i < dnInfos.length; i++) {
DatanodeInfo[] d1 = dnInfos[i];
DatanodeInfo[] d2 = dnInfos2[i];
assertEquals(d1.length, d2.length);
for (int j = 0; j < d1.length; j++) {
compare(d1[j], d2[j]);
}
}
}
}