Merge r1210719 and r1210746 from trunk for HDFS-2618.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1230388 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
378be13d66
commit
00620ee2bd
|
@ -17,6 +17,8 @@ Release 0.23-PB - Unreleased
|
||||||
|
|
||||||
HDFS-2581. Implement protobuf service for JournalProtocol. (suresh)
|
HDFS-2581. Implement protobuf service for JournalProtocol. (suresh)
|
||||||
|
|
||||||
|
HDFS-2618. Implement protobuf service for NamenodeProtocol. (suresh)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HDFS-2018. Move all journal stream management code into one place.
|
HDFS-2018. Move all journal stream management code into one place.
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class JournalProtocolTranslatorPB implements JournalProtocol, Closeable {
|
||||||
Configuration conf) throws IOException {
|
Configuration conf) throws IOException {
|
||||||
RPC.setProtocolEngine(conf, JournalProtocolPB.class, ProtobufRpcEngine.class);
|
RPC.setProtocolEngine(conf, JournalProtocolPB.class, ProtobufRpcEngine.class);
|
||||||
rpcProxy = RPC.getProxy(JournalProtocolPB.class,
|
rpcProxy = RPC.getProxy(JournalProtocolPB.class,
|
||||||
JournalProtocol.versionID, nameNodeAddr, conf);
|
RPC.getProtocolVersion(JournalProtocolPB.class), nameNodeAddr, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -64,7 +64,7 @@ public class JournalProtocolTranslatorPB implements JournalProtocol, Closeable {
|
||||||
@Override
|
@Override
|
||||||
public long getProtocolVersion(String protocolName, long clientVersion)
|
public long getProtocolVersion(String protocolName, long clientVersion)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return 0;
|
return rpcProxy.getProtocolVersion(protocolName, clientVersion);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.protocolPB;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
|
||||||
|
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
|
||||||
|
import org.apache.hadoop.ipc.ProtocolInfo;
|
||||||
|
import org.apache.hadoop.ipc.VersionedProtocol;
|
||||||
|
import org.apache.hadoop.security.KerberosInfo;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Protocol that a secondary NameNode uses to communicate with the NameNode.
|
||||||
|
* It's used to get part of the name node state
|
||||||
|
*
|
||||||
|
* Note: This extends the protocolbuffer service based interface to
|
||||||
|
* add annotations required for security.
|
||||||
|
*/
|
||||||
|
@KerberosInfo(
|
||||||
|
serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY,
|
||||||
|
clientPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY)
|
||||||
|
@ProtocolInfo(protocolName =
|
||||||
|
"org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol",
|
||||||
|
protocolVersion = 1)
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public interface NamenodeProtocolPB extends
|
||||||
|
NamenodeProtocolService.BlockingInterface, VersionedProtocol {
|
||||||
|
/**
|
||||||
|
* This method is defined to get the protocol signature using
|
||||||
|
* the R23 protocol - hence we have added the suffix of 2 the method name
|
||||||
|
* to avoid conflict.
|
||||||
|
*/
|
||||||
|
public ProtocolSignatureWritable getProtocolSignature2(String protocol,
|
||||||
|
long clientVersion, int clientMethodsHash) throws IOException;
|
||||||
|
}
|
|
@ -0,0 +1,253 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.protocolPB;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamespaceInfoProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.VersionRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.VersionResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||||
|
import org.apache.hadoop.ipc.ProtocolSignature;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation for protobuf service that forwards requests
|
||||||
|
* received on {@link NamenodeProtocolPB} to the
|
||||||
|
* {@link NamenodeProtocol} server implementation.
|
||||||
|
*/
|
||||||
|
public class NamenodeProtocolServerSideTranslatorPB implements
|
||||||
|
NamenodeProtocolPB {
|
||||||
|
private final NamenodeProtocol impl;
|
||||||
|
|
||||||
|
public NamenodeProtocolServerSideTranslatorPB(NamenodeProtocol impl) {
|
||||||
|
this.impl = impl;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetBlocksResponseProto getBlocks(RpcController unused,
|
||||||
|
GetBlocksRequestProto request) throws ServiceException {
|
||||||
|
DatanodeInfo dnInfo = new DatanodeInfo(PBHelper.convert(request
|
||||||
|
.getDatanode()));
|
||||||
|
BlocksWithLocations blocks;
|
||||||
|
try {
|
||||||
|
blocks = impl.getBlocks(dnInfo, request.getSize());
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
return GetBlocksResponseProto.newBuilder()
|
||||||
|
.setBlocks(PBHelper.convert(blocks)).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetBlockKeysResponseProto getBlockKeys(RpcController unused,
|
||||||
|
GetBlockKeysRequestProto request) throws ServiceException {
|
||||||
|
ExportedBlockKeys keys;
|
||||||
|
try {
|
||||||
|
keys = impl.getBlockKeys();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
return GetBlockKeysResponseProto.newBuilder()
|
||||||
|
.setKeys(PBHelper.convert(keys)).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetTransactionIdResponseProto getTransationId(RpcController unused,
|
||||||
|
GetTransactionIdRequestProto request) throws ServiceException {
|
||||||
|
long txid;
|
||||||
|
try {
|
||||||
|
txid = impl.getTransactionID();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
return GetTransactionIdResponseProto.newBuilder().setTxId(txid).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RollEditLogResponseProto rollEditLog(RpcController unused,
|
||||||
|
RollEditLogRequestProto request) throws ServiceException {
|
||||||
|
CheckpointSignature signature;
|
||||||
|
try {
|
||||||
|
signature = impl.rollEditLog();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
return RollEditLogResponseProto.newBuilder()
|
||||||
|
.setSignature(PBHelper.convert(signature)).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ErrorReportResponseProto errorReport(RpcController unused,
|
||||||
|
ErrorReportRequestProto request) throws ServiceException {
|
||||||
|
try {
|
||||||
|
impl.errorReport(PBHelper.convert(request.getRegistration()),
|
||||||
|
request.getErrorCode(), request.getMsg());
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
return ErrorReportResponseProto.newBuilder().build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RegisterResponseProto register(RpcController unused,
|
||||||
|
RegisterRequestProto request) throws ServiceException {
|
||||||
|
NamenodeRegistration reg;
|
||||||
|
try {
|
||||||
|
reg = impl.register(PBHelper.convert(request.getRegistration()));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
return RegisterResponseProto.newBuilder()
|
||||||
|
.setRegistration(PBHelper.convert(reg)).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StartCheckpointResponseProto startCheckpoint(RpcController unused,
|
||||||
|
StartCheckpointRequestProto request) throws ServiceException {
|
||||||
|
NamenodeCommand cmd;
|
||||||
|
try {
|
||||||
|
cmd = impl.startCheckpoint(PBHelper.convert(request.getRegistration()));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
return StartCheckpointResponseProto.newBuilder()
|
||||||
|
.setCommand(PBHelper.convert(cmd)).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EndCheckpointResponseProto endCheckpoint(RpcController unused,
|
||||||
|
EndCheckpointRequestProto request) throws ServiceException {
|
||||||
|
try {
|
||||||
|
impl.endCheckpoint(PBHelper.convert(request.getRegistration()),
|
||||||
|
PBHelper.convert(request.getSignature()));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
return EndCheckpointResponseProto.newBuilder().build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetEditLogManifestResponseProto getEditLogManifest(
|
||||||
|
RpcController unused, GetEditLogManifestRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
RemoteEditLogManifest manifest;
|
||||||
|
try {
|
||||||
|
manifest = impl.getEditLogManifest(request.getSinceTxId());
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
return GetEditLogManifestResponseProto.newBuilder()
|
||||||
|
.setManifest(PBHelper.convert(manifest)).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getProtocolVersion(String protocol, long clientVersion)
|
||||||
|
throws IOException {
|
||||||
|
return RPC.getProtocolVersion(NamenodeProtocolPB.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The client side will redirect getProtocolSignature to
|
||||||
|
* getProtocolSignature2.
|
||||||
|
*
|
||||||
|
* However the RPC layer below on the Server side will call getProtocolVersion
|
||||||
|
* and possibly in the future getProtocolSignature. Hence we still implement
|
||||||
|
* it even though the end client will never call this method.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public ProtocolSignature getProtocolSignature(String protocol,
|
||||||
|
long clientVersion, int clientMethodsHash) throws IOException {
|
||||||
|
/**
|
||||||
|
* Don't forward this to the server. The protocol version and signature is
|
||||||
|
* that of {@link NamenodeProtocol}
|
||||||
|
*/
|
||||||
|
if (!protocol.equals(RPC.getProtocolName(NamenodeProtocolPB.class))) {
|
||||||
|
throw new IOException("Namenode Serverside implements " +
|
||||||
|
RPC.getProtocolName(NamenodeProtocolPB.class) +
|
||||||
|
". The following requested protocol is unknown: " + protocol);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
|
||||||
|
RPC.getProtocolVersion(NamenodeProtocolPB.class),
|
||||||
|
NamenodeProtocolPB.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ProtocolSignatureWritable getProtocolSignature2(String protocol,
|
||||||
|
long clientVersion, int clientMethodsHash) throws IOException {
|
||||||
|
/**
|
||||||
|
* Don't forward this to the server. The protocol version and signature is
|
||||||
|
* that of {@link NamenodePBProtocol}
|
||||||
|
*/
|
||||||
|
return ProtocolSignatureWritable.convert(
|
||||||
|
this.getProtocolSignature(protocol, clientVersion, clientMethodsHash));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public VersionResponseProto versionRequest(RpcController controller,
|
||||||
|
VersionRequestProto request) throws ServiceException {
|
||||||
|
NamespaceInfo info;
|
||||||
|
try {
|
||||||
|
info = impl.versionRequest();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
return VersionResponseProto.newBuilder()
|
||||||
|
.setInfo(convert(info)).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private NamespaceInfoProto convert(NamespaceInfo info) {
|
||||||
|
return NamespaceInfoProto.newBuilder()
|
||||||
|
.setBlockPoolID(info.getBlockPoolID())
|
||||||
|
.setBuildVersion(info.getBuildVersion())
|
||||||
|
.setDistUpgradeVersion(info.getDistributedUpgradeVersion())
|
||||||
|
.setStorageInfo(PBHelper.convert(info)).build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,269 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.protocolPB;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.VersionRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
|
import org.apache.hadoop.io.retry.RetryProxy;
|
||||||
|
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||||
|
import org.apache.hadoop.ipc.ProtocolSignature;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is the client side translator to translate the requests made on
|
||||||
|
* {@link NamenodeProtocol} interfaces to the RPC server implementing
|
||||||
|
* {@link NamenodeProtocolPB}.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Stable
|
||||||
|
public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
|
||||||
|
Closeable {
|
||||||
|
/** RpcController is not used and hence is set to null */
|
||||||
|
private final static RpcController NULL_CONTROLLER = null;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Protobuf requests with no parameters instantiated only once
|
||||||
|
*/
|
||||||
|
private static final GetBlockKeysRequestProto GET_BLOCKKEYS =
|
||||||
|
GetBlockKeysRequestProto.newBuilder().build();
|
||||||
|
private static final GetTransactionIdRequestProto GET_TRANSACTIONID =
|
||||||
|
GetTransactionIdRequestProto.newBuilder().build();
|
||||||
|
private static final RollEditLogRequestProto ROLL_EDITLOG =
|
||||||
|
RollEditLogRequestProto.newBuilder().build();
|
||||||
|
private static final VersionRequestProto VERSION_REQUEST =
|
||||||
|
VersionRequestProto.newBuilder().build();
|
||||||
|
|
||||||
|
final private NamenodeProtocolPB rpcProxy;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
private static NamenodeProtocolPB createNamenode(
|
||||||
|
InetSocketAddress nameNodeAddr, Configuration conf,
|
||||||
|
UserGroupInformation ugi) throws IOException {
|
||||||
|
return RPC.getProxy(NamenodeProtocolPB.class,
|
||||||
|
RPC.getProtocolVersion(NamenodeProtocolPB.class), nameNodeAddr, ugi,
|
||||||
|
conf, NetUtils.getSocketFactory(conf, NamenodeProtocolPB.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Create a {@link NameNode} proxy */
|
||||||
|
static NamenodeProtocolPB createNamenodeWithRetry(
|
||||||
|
NamenodeProtocolPB rpcNamenode) {
|
||||||
|
RetryPolicy createPolicy = RetryPolicies
|
||||||
|
.retryUpToMaximumCountWithFixedSleep(5,
|
||||||
|
HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
|
||||||
|
Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>();
|
||||||
|
remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
|
||||||
|
createPolicy);
|
||||||
|
|
||||||
|
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>();
|
||||||
|
exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
|
||||||
|
.retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
|
||||||
|
remoteExceptionToPolicyMap));
|
||||||
|
RetryPolicy methodPolicy = RetryPolicies.retryByException(
|
||||||
|
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
|
||||||
|
Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
|
||||||
|
|
||||||
|
methodNameToPolicyMap.put("create", methodPolicy);
|
||||||
|
|
||||||
|
return (NamenodeProtocolPB) RetryProxy.create(NamenodeProtocolPB.class,
|
||||||
|
rpcNamenode, methodNameToPolicyMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
public NamenodeProtocolTranslatorPB(InetSocketAddress nameNodeAddr,
|
||||||
|
Configuration conf, UserGroupInformation ugi) throws IOException {
|
||||||
|
rpcProxy = createNamenodeWithRetry(createNamenode(nameNodeAddr, conf, ugi));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
RPC.stopProxy(rpcProxy);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ProtocolSignature getProtocolSignature(String protocolName,
|
||||||
|
long clientVersion, int clientMethodHash) throws IOException {
|
||||||
|
return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2(
|
||||||
|
protocolName, clientVersion, clientMethodHash));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getProtocolVersion(String protocolName, long clientVersion)
|
||||||
|
throws IOException {
|
||||||
|
return rpcProxy.getProtocolVersion(protocolName, clientVersion);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
|
||||||
|
throws IOException {
|
||||||
|
GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
|
||||||
|
.setDatanode(PBHelper.convert(datanode)).setSize(size)
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req)
|
||||||
|
.getBlocks());
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ExportedBlockKeys getBlockKeys() throws IOException {
|
||||||
|
try {
|
||||||
|
return PBHelper.convert(rpcProxy.getBlockKeys(NULL_CONTROLLER,
|
||||||
|
GET_BLOCKKEYS).getKeys());
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getTransactionID() throws IOException {
|
||||||
|
try {
|
||||||
|
return rpcProxy.getTransationId(NULL_CONTROLLER, GET_TRANSACTIONID)
|
||||||
|
.getTxId();
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
public CheckpointSignature rollEditLog() throws IOException {
|
||||||
|
try {
|
||||||
|
return PBHelper.convert(rpcProxy.rollEditLog(NULL_CONTROLLER,
|
||||||
|
ROLL_EDITLOG).getSignature());
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NamespaceInfo versionRequest() throws IOException {
|
||||||
|
try {
|
||||||
|
return PBHelper.convert(rpcProxy.versionRequest(NULL_CONTROLLER,
|
||||||
|
VERSION_REQUEST).getInfo());
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void errorReport(NamenodeRegistration registration, int errorCode,
|
||||||
|
String msg) throws IOException {
|
||||||
|
ErrorReportRequestProto req = ErrorReportRequestProto.newBuilder()
|
||||||
|
.setErrorCode(errorCode).setMsg(msg)
|
||||||
|
.setRegistration(PBHelper.convert(registration)).build();
|
||||||
|
try {
|
||||||
|
rpcProxy.errorReport(NULL_CONTROLLER, req);
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NamenodeRegistration register(NamenodeRegistration registration)
|
||||||
|
throws IOException {
|
||||||
|
RegisterRequestProto req = RegisterRequestProto.newBuilder()
|
||||||
|
.setRegistration(PBHelper.convert(registration)).build();
|
||||||
|
try {
|
||||||
|
return PBHelper.convert(rpcProxy.register(NULL_CONTROLLER, req)
|
||||||
|
.getRegistration());
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
|
||||||
|
throws IOException {
|
||||||
|
StartCheckpointRequestProto req = StartCheckpointRequestProto.newBuilder()
|
||||||
|
.setRegistration(PBHelper.convert(registration)).build();
|
||||||
|
NamenodeCommandProto cmd;
|
||||||
|
try {
|
||||||
|
cmd = rpcProxy.startCheckpoint(NULL_CONTROLLER, req).getCommand();
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
return PBHelper.convert(cmd);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void endCheckpoint(NamenodeRegistration registration,
|
||||||
|
CheckpointSignature sig) throws IOException {
|
||||||
|
EndCheckpointRequestProto req = EndCheckpointRequestProto.newBuilder()
|
||||||
|
.setRegistration(PBHelper.convert(registration))
|
||||||
|
.setSignature(PBHelper.convert(sig)).build();
|
||||||
|
try {
|
||||||
|
rpcProxy.endCheckpoint(NULL_CONTROLLER, req);
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
|
||||||
|
throws IOException {
|
||||||
|
GetEditLogManifestRequestProto req = GetEditLogManifestRequestProto
|
||||||
|
.newBuilder().setSinceTxId(sinceTxId).build();
|
||||||
|
try {
|
||||||
|
return PBHelper.convert(rpcProxy.getEditLogManifest(NULL_CONTROLLER, req)
|
||||||
|
.getManifest());
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,18 +17,45 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.protocolPB;
|
package org.apache.hadoop.hdfs.protocolPB;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamespaceInfoProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogManifestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utilities for converting protobuf classes to and from
|
* Utilities for converting protobuf classes to and from implementation classes.
|
||||||
* implementation classes.
|
|
||||||
*/
|
*/
|
||||||
class PBHelper {
|
class PBHelper {
|
||||||
private PBHelper() {
|
private PBHelper() {
|
||||||
|
@ -65,10 +92,8 @@ class PBHelper {
|
||||||
|
|
||||||
public static StorageInfoProto convert(StorageInfo info) {
|
public static StorageInfoProto convert(StorageInfo info) {
|
||||||
return StorageInfoProto.newBuilder().setClusterID(info.getClusterID())
|
return StorageInfoProto.newBuilder().setClusterID(info.getClusterID())
|
||||||
.setCTime(info.getCTime())
|
.setCTime(info.getCTime()).setLayoutVersion(info.getLayoutVersion())
|
||||||
.setLayoutVersion(info.getLayoutVersion())
|
.setNamespceID(info.getNamespaceID()).build();
|
||||||
.setNamespceID(info.getNamespaceID())
|
|
||||||
.build();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static StorageInfo convert(StorageInfoProto info) {
|
public static StorageInfo convert(StorageInfoProto info) {
|
||||||
|
@ -76,11 +101,9 @@ class PBHelper {
|
||||||
info.getClusterID(), info.getCTime());
|
info.getClusterID(), info.getCTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static NamenodeRegistrationProto convert(NamenodeRegistration reg) {
|
public static NamenodeRegistrationProto convert(NamenodeRegistration reg) {
|
||||||
return NamenodeRegistrationProto.newBuilder()
|
return NamenodeRegistrationProto.newBuilder()
|
||||||
.setHttpAddress(reg.getHttpAddress())
|
.setHttpAddress(reg.getHttpAddress()).setRole(convert(reg.getRole()))
|
||||||
.setRole(convert(reg.getRole()))
|
|
||||||
.setRpcAddress(reg.getAddress())
|
.setRpcAddress(reg.getAddress())
|
||||||
.setStorageInfo(convert((StorageInfo) reg)).build();
|
.setStorageInfo(convert((StorageInfo) reg)).build();
|
||||||
}
|
}
|
||||||
|
@ -89,4 +112,173 @@ class PBHelper {
|
||||||
return new NamenodeRegistration(reg.getRpcAddress(), reg.getHttpAddress(),
|
return new NamenodeRegistration(reg.getRpcAddress(), reg.getHttpAddress(),
|
||||||
convert(reg.getStorageInfo()), convert(reg.getRole()));
|
convert(reg.getStorageInfo()), convert(reg.getRole()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static DatanodeID convert(DatanodeIDProto dn) {
|
||||||
|
return new DatanodeID(dn.getName(), dn.getStorageID(), dn.getInfoPort(),
|
||||||
|
dn.getIpcPort());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static DatanodeIDProto convert(DatanodeID dn) {
|
||||||
|
return DatanodeIDProto.newBuilder().setName(dn.getName())
|
||||||
|
.setInfoPort(dn.getInfoPort()).setIpcPort(dn.getIpcPort())
|
||||||
|
.setStorageID(dn.getStorageID()).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static BlockProto convert(Block b) {
|
||||||
|
return BlockProto.newBuilder().setBlockId(b.getBlockId())
|
||||||
|
.setGenStamp(b.getGenerationStamp()).setNumBytes(b.getNumBytes())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Block convert(BlockProto b) {
|
||||||
|
return new Block(b.getBlockId(), b.getGenStamp(), b.getNumBytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static BlockWithLocationsProto convert(BlockWithLocations blk) {
|
||||||
|
return BlockWithLocationsProto.newBuilder()
|
||||||
|
.setBlock(convert(blk.getBlock()))
|
||||||
|
.addAllDatanodeIDs(Arrays.asList(blk.getDatanodes())).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static BlockWithLocations convert(BlockWithLocationsProto b) {
|
||||||
|
return new BlockWithLocations(convert(b.getBlock()), b.getDatanodeIDsList()
|
||||||
|
.toArray(new String[0]));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static BlocksWithLocationsProto convert(BlocksWithLocations blks) {
|
||||||
|
BlocksWithLocationsProto.Builder builder = BlocksWithLocationsProto
|
||||||
|
.newBuilder();
|
||||||
|
for (BlockWithLocations b : blks.getBlocks()) {
|
||||||
|
builder.addBlocks(convert(b));
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static BlocksWithLocations convert(BlocksWithLocationsProto blocks) {
|
||||||
|
return new BlocksWithLocations(convert(blocks.getBlocksList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static BlockKeyProto convert(BlockKey key) {
|
||||||
|
byte[] encodedKey = key.getEncodedKey();
|
||||||
|
ByteString keyBytes = ByteString.copyFrom(encodedKey == null ? new byte[0]
|
||||||
|
: encodedKey);
|
||||||
|
return BlockKeyProto.newBuilder().setKeyId(key.getKeyId())
|
||||||
|
.setKeyBytes(keyBytes).setExpiryDate(key.getExpiryDate()).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static BlockKey convert(BlockKeyProto k) {
|
||||||
|
return new BlockKey(k.getKeyId(), k.getExpiryDate(), k.getKeyBytes()
|
||||||
|
.toByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ExportedBlockKeysProto convert(ExportedBlockKeys keys) {
|
||||||
|
ExportedBlockKeysProto.Builder builder = ExportedBlockKeysProto
|
||||||
|
.newBuilder();
|
||||||
|
builder.setIsBlockTokenEnabled(keys.isBlockTokenEnabled())
|
||||||
|
.setKeyUpdateInterval(keys.getKeyUpdateInterval())
|
||||||
|
.setTokenLifeTime(keys.getTokenLifetime())
|
||||||
|
.setCurrentKey(convert(keys.getCurrentKey()));
|
||||||
|
for (BlockKey k : keys.getAllKeys()) {
|
||||||
|
builder.addAllKeys(convert(k));
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ExportedBlockKeys convert(ExportedBlockKeysProto keys) {
|
||||||
|
return new ExportedBlockKeys(keys.getIsBlockTokenEnabled(),
|
||||||
|
keys.getKeyUpdateInterval(), keys.getTokenLifeTime(),
|
||||||
|
convert(keys.getCurrentKey()), convertBlockKeys(keys.getAllKeysList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CheckpointSignatureProto convert(CheckpointSignature s) {
|
||||||
|
return CheckpointSignatureProto.newBuilder()
|
||||||
|
.setBlockPoolId(s.getBlockpoolID())
|
||||||
|
.setCurSegmentTxId(s.getCurSegmentTxId())
|
||||||
|
.setMostRecentCheckpointTxId(s.getMostRecentCheckpointTxId())
|
||||||
|
.setStorageInfo(PBHelper.convert((StorageInfo) s)).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CheckpointSignature convert(CheckpointSignatureProto s) {
|
||||||
|
return new CheckpointSignature(PBHelper.convert(s.getStorageInfo()),
|
||||||
|
s.getBlockPoolId(), s.getMostRecentCheckpointTxId(),
|
||||||
|
s.getCurSegmentTxId());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static RemoteEditLogProto convert(RemoteEditLog log) {
|
||||||
|
return RemoteEditLogProto.newBuilder().setEndTxId(log.getEndTxId())
|
||||||
|
.setStartTxId(log.getStartTxId()).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static RemoteEditLog convert(RemoteEditLogProto l) {
|
||||||
|
return new RemoteEditLog(l.getStartTxId(), l.getEndTxId());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static RemoteEditLogManifestProto convert(
|
||||||
|
RemoteEditLogManifest manifest) {
|
||||||
|
RemoteEditLogManifestProto.Builder builder = RemoteEditLogManifestProto
|
||||||
|
.newBuilder();
|
||||||
|
for (RemoteEditLog log : manifest.getLogs()) {
|
||||||
|
builder.addLogs(convert(log));
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static RemoteEditLogManifest convert(
|
||||||
|
RemoteEditLogManifestProto manifest) {
|
||||||
|
List<RemoteEditLog> logs = new ArrayList<RemoteEditLog>(manifest
|
||||||
|
.getLogsList().size());
|
||||||
|
for (RemoteEditLogProto l : manifest.getLogsList()) {
|
||||||
|
logs.add(convert(l));
|
||||||
|
}
|
||||||
|
return new RemoteEditLogManifest(logs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CheckpointCommandProto convert(CheckpointCommand cmd) {
|
||||||
|
return CheckpointCommandProto.newBuilder()
|
||||||
|
.setSignature(convert(cmd.getSignature())).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static NamenodeCommandProto convert(NamenodeCommand cmd) {
|
||||||
|
if (cmd instanceof CheckpointCommand) {
|
||||||
|
return NamenodeCommandProto.newBuilder().setAction(cmd.getAction())
|
||||||
|
.setType(NamenodeCommandProto.Type.NamenodeCommand)
|
||||||
|
.setCheckpointCmd(convert((CheckpointCommand) cmd)).build();
|
||||||
|
}
|
||||||
|
return NamenodeCommandProto.newBuilder().setAction(cmd.getAction()).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static BlockWithLocations[] convert(List<BlockWithLocationsProto> b) {
|
||||||
|
BlockWithLocations[] ret = new BlockWithLocations[b.size()];
|
||||||
|
int i = 0;
|
||||||
|
for (BlockWithLocationsProto entry : b) {
|
||||||
|
ret[i++] = convert(entry);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static BlockKey[] convertBlockKeys(List<BlockKeyProto> list) {
|
||||||
|
BlockKey[] ret = new BlockKey[list.size()];
|
||||||
|
int i = 0;
|
||||||
|
for (BlockKeyProto k : list) {
|
||||||
|
ret[i++] = convert(k);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static NamespaceInfo convert(NamespaceInfoProto info) {
|
||||||
|
StorageInfoProto storage = info.getStorageInfo();
|
||||||
|
return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(),
|
||||||
|
info.getBlockPoolID(), storage.getCTime(), info.getDistUpgradeVersion());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static NamenodeCommand convert(NamenodeCommandProto cmd) {
|
||||||
|
switch (cmd.getType()) {
|
||||||
|
case CheckPointCommand:
|
||||||
|
CheckpointCommandProto chkPt = cmd.getCheckpointCmd();
|
||||||
|
return new CheckpointCommand(PBHelper.convert(chkPt.getSignature()),
|
||||||
|
chkPt.getNeedToReturnImage());
|
||||||
|
default:
|
||||||
|
return new NamenodeCommand(cmd.getAction());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -36,4 +36,8 @@ public class BlockKey extends DelegationKey {
|
||||||
public BlockKey(int keyId, long expiryDate, SecretKey key) {
|
public BlockKey(int keyId, long expiryDate, SecretKey key) {
|
||||||
super(keyId, expiryDate, key);
|
super(keyId, expiryDate, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public BlockKey(int keyId, long expiryDate, byte[] encodedKey) {
|
||||||
|
super(keyId, expiryDate, encodedKey);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,7 +42,7 @@ message GetBlocksRequestProto {
|
||||||
* blocks - List of returned blocks
|
* blocks - List of returned blocks
|
||||||
*/
|
*/
|
||||||
message GetBlocksResponseProto {
|
message GetBlocksResponseProto {
|
||||||
required BlockWithLocationsProto blocks = 1; // List of blocks
|
required BlocksWithLocationsProto blocks = 1; // List of blocks
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -85,12 +85,25 @@ message RollEditLogResponseProto {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* registartion - Namenode reporting the error
|
* void request
|
||||||
|
*/
|
||||||
|
message VersionRequestProto {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* void request
|
||||||
|
*/
|
||||||
|
message VersionResponseProto {
|
||||||
|
required NamespaceInfoProto info = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* registration - Namenode reporting the error
|
||||||
* errorCode - error code indicating the error
|
* errorCode - error code indicating the error
|
||||||
* msg - Free text description of the error
|
* msg - Free text description of the error
|
||||||
*/
|
*/
|
||||||
message ErrorReportRequestProto {
|
message ErrorReportRequestProto {
|
||||||
required NamenodeRegistrationProto registartion = 1; // Registartion info
|
required NamenodeRegistrationProto registration = 1; // Registration info
|
||||||
required uint32 errorCode = 2; // Error code
|
required uint32 errorCode = 2; // Error code
|
||||||
required string msg = 3; // Error message
|
required string msg = 3; // Error message
|
||||||
}
|
}
|
||||||
|
@ -193,6 +206,11 @@ service NamenodeProtocolService {
|
||||||
*/
|
*/
|
||||||
rpc rollEditLog(RollEditLogRequestProto) returns(RollEditLogResponseProto);
|
rpc rollEditLog(RollEditLogRequestProto) returns(RollEditLogResponseProto);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close the current editlog and open a new one for checkpointing purposes
|
||||||
|
*/
|
||||||
|
rpc versionRequest(VersionRequestProto) returns(VersionResponseProto);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Report from a sub-ordinate namenode of an error to the active namenode.
|
* Report from a sub-ordinate namenode of an error to the active namenode.
|
||||||
* Active namenode may decide to unregister the reporting namenode
|
* Active namenode may decide to unregister the reporting namenode
|
||||||
|
|
|
@ -271,7 +271,7 @@ message BlockProto {
|
||||||
*/
|
*/
|
||||||
message BlockWithLocationsProto {
|
message BlockWithLocationsProto {
|
||||||
required BlockProto block = 1; // Block
|
required BlockProto block = 1; // Block
|
||||||
repeated DatanodeIDProto datanodeIDs = 2; // Datanodes with replicas of the block
|
repeated string datanodeIDs = 2; // Datanodes with replicas of the block
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -19,12 +19,35 @@ package org.apache.hadoop.hdfs.protocolPB;
|
||||||
|
|
||||||
import static junit.framework.Assert.*;
|
import static junit.framework.Assert.*;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogManifestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -47,9 +70,13 @@ public class TestPBHelper {
|
||||||
PBHelper.convert(NamenodeRoleProto.NAMENODE));
|
PBHelper.convert(NamenodeRoleProto.NAMENODE));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static StorageInfo getStorageInfo() {
|
||||||
|
return new StorageInfo(1, 2, "cid", 3);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConvertStoragInfo() {
|
public void testConvertStoragInfo() {
|
||||||
StorageInfo info = new StorageInfo(1, 2, "cid", 3);
|
StorageInfo info = getStorageInfo();
|
||||||
StorageInfoProto infoProto = PBHelper.convert(info);
|
StorageInfoProto infoProto = PBHelper.convert(info);
|
||||||
StorageInfo info2 = PBHelper.convert(infoProto);
|
StorageInfo info2 = PBHelper.convert(infoProto);
|
||||||
assertEquals(info.getClusterID(), info2.getClusterID());
|
assertEquals(info.getClusterID(), info2.getClusterID());
|
||||||
|
@ -60,7 +87,7 @@ public class TestPBHelper {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConvertNamenodeRegistration() {
|
public void testConvertNamenodeRegistration() {
|
||||||
StorageInfo info = new StorageInfo(1, 2, "cid", 3);
|
StorageInfo info = getStorageInfo();
|
||||||
NamenodeRegistration reg = new NamenodeRegistration("address:999",
|
NamenodeRegistration reg = new NamenodeRegistration("address:999",
|
||||||
"http:1000", info, NamenodeRole.NAMENODE);
|
"http:1000", info, NamenodeRole.NAMENODE);
|
||||||
NamenodeRegistrationProto regProto = PBHelper.convert(reg);
|
NamenodeRegistrationProto regProto = PBHelper.convert(reg);
|
||||||
|
@ -76,4 +103,142 @@ public class TestPBHelper {
|
||||||
assertEquals(reg.getVersion(), reg2.getVersion());
|
assertEquals(reg.getVersion(), reg2.getVersion());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvertDatanodeID() {
|
||||||
|
DatanodeID dn = new DatanodeID("node", "sid", 1, 2);
|
||||||
|
DatanodeIDProto dnProto = PBHelper.convert(dn);
|
||||||
|
DatanodeID dn2 = PBHelper.convert(dnProto);
|
||||||
|
assertEquals(dn.getHost(), dn2.getHost());
|
||||||
|
assertEquals(dn.getInfoPort(), dn2.getInfoPort());
|
||||||
|
assertEquals(dn.getIpcPort(), dn2.getIpcPort());
|
||||||
|
assertEquals(dn.getName(), dn2.getName());
|
||||||
|
assertEquals(dn.getPort(), dn2.getPort());
|
||||||
|
assertEquals(dn.getStorageID(), dn2.getStorageID());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvertBlock() {
|
||||||
|
Block b = new Block(1, 100, 3);
|
||||||
|
BlockProto bProto = PBHelper.convert(b);
|
||||||
|
Block b2 = PBHelper.convert(bProto);
|
||||||
|
assertEquals(b, b2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static BlockWithLocations getBlockWithLocations(int bid) {
|
||||||
|
return new BlockWithLocations(new Block(bid, 0, 1), new String[] { "dn1",
|
||||||
|
"dn2", "dn3" });
|
||||||
|
}
|
||||||
|
|
||||||
|
private void compare(BlockWithLocations locs1, BlockWithLocations locs2) {
|
||||||
|
assertEquals(locs1.getBlock(), locs2.getBlock());
|
||||||
|
assertTrue(Arrays.equals(locs1.getDatanodes(), locs2.getDatanodes()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvertBlockWithLocations() {
|
||||||
|
BlockWithLocations locs = getBlockWithLocations(1);
|
||||||
|
BlockWithLocationsProto locsProto = PBHelper.convert(locs);
|
||||||
|
BlockWithLocations locs2 = PBHelper.convert(locsProto);
|
||||||
|
compare(locs, locs2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvertBlocksWithLocations() {
|
||||||
|
BlockWithLocations[] list = new BlockWithLocations[] {
|
||||||
|
getBlockWithLocations(1), getBlockWithLocations(2) };
|
||||||
|
BlocksWithLocations locs = new BlocksWithLocations(list);
|
||||||
|
BlocksWithLocationsProto locsProto = PBHelper.convert(locs);
|
||||||
|
BlocksWithLocations locs2 = PBHelper.convert(locsProto);
|
||||||
|
BlockWithLocations[] blocks = locs.getBlocks();
|
||||||
|
BlockWithLocations[] blocks2 = locs2.getBlocks();
|
||||||
|
assertEquals(blocks.length, blocks2.length);
|
||||||
|
for (int i = 0; i < blocks.length; i++) {
|
||||||
|
compare(blocks[i], blocks2[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static BlockKey getBlockKey(int keyId) {
|
||||||
|
return new BlockKey(keyId, 10, "encodedKey".getBytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void compare(BlockKey k1, BlockKey k2) {
|
||||||
|
assertEquals(k1.getExpiryDate(), k2.getExpiryDate());
|
||||||
|
assertEquals(k1.getKeyId(), k2.getKeyId());
|
||||||
|
assertTrue(Arrays.equals(k1.getEncodedKey(), k2.getEncodedKey()));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvertBlockKey() {
|
||||||
|
BlockKey key = getBlockKey(1);
|
||||||
|
BlockKeyProto keyProto = PBHelper.convert(key);
|
||||||
|
BlockKey key1 = PBHelper.convert(keyProto);
|
||||||
|
compare(key, key1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvertExportedBlockKeys() {
|
||||||
|
BlockKey[] keys = new BlockKey[] { getBlockKey(2), getBlockKey(3) };
|
||||||
|
ExportedBlockKeys expKeys = new ExportedBlockKeys(true, 9, 10,
|
||||||
|
getBlockKey(1), keys);
|
||||||
|
ExportedBlockKeysProto expKeysProto = PBHelper.convert(expKeys);
|
||||||
|
ExportedBlockKeys expKeys1 = PBHelper.convert(expKeysProto);
|
||||||
|
|
||||||
|
BlockKey[] allKeys = expKeys.getAllKeys();
|
||||||
|
BlockKey[] allKeys1 = expKeys1.getAllKeys();
|
||||||
|
assertEquals(allKeys.length, allKeys1.length);
|
||||||
|
for (int i = 0; i < allKeys.length; i++) {
|
||||||
|
compare(allKeys[i], allKeys1[i]);
|
||||||
|
}
|
||||||
|
compare(expKeys.getCurrentKey(), expKeys1.getCurrentKey());
|
||||||
|
assertEquals(expKeys.getKeyUpdateInterval(),
|
||||||
|
expKeys1.getKeyUpdateInterval());
|
||||||
|
assertEquals(expKeys.getTokenLifetime(), expKeys1.getTokenLifetime());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvertCheckpointSignature() {
|
||||||
|
CheckpointSignature s = new CheckpointSignature(getStorageInfo(), "bpid",
|
||||||
|
100, 1);
|
||||||
|
CheckpointSignatureProto sProto = PBHelper.convert(s);
|
||||||
|
CheckpointSignature s1 = PBHelper.convert(sProto);
|
||||||
|
assertEquals(s.getBlockpoolID(), s1.getBlockpoolID());
|
||||||
|
assertEquals(s.getClusterID(), s1.getClusterID());
|
||||||
|
assertEquals(s.getCTime(), s1.getCTime());
|
||||||
|
assertEquals(s.getCurSegmentTxId(), s1.getCurSegmentTxId());
|
||||||
|
assertEquals(s.getLayoutVersion(), s1.getLayoutVersion());
|
||||||
|
assertEquals(s.getMostRecentCheckpointTxId(),
|
||||||
|
s1.getMostRecentCheckpointTxId());
|
||||||
|
assertEquals(s.getNamespaceID(), s1.getNamespaceID());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void compare(RemoteEditLog l1, RemoteEditLog l2) {
|
||||||
|
assertEquals(l1.getEndTxId(), l2.getEndTxId());
|
||||||
|
assertEquals(l1.getStartTxId(), l2.getStartTxId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvertRemoteEditLog() {
|
||||||
|
RemoteEditLog l = new RemoteEditLog(1, 100);
|
||||||
|
RemoteEditLogProto lProto = PBHelper.convert(l);
|
||||||
|
RemoteEditLog l1 = PBHelper.convert(lProto);
|
||||||
|
compare(l, l1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvertRemoteEditLogManifest() {
|
||||||
|
List<RemoteEditLog> logs = new ArrayList<RemoteEditLog>();
|
||||||
|
logs.add(new RemoteEditLog(1, 10));
|
||||||
|
logs.add(new RemoteEditLog(11, 20));
|
||||||
|
RemoteEditLogManifest m = new RemoteEditLogManifest(logs);
|
||||||
|
RemoteEditLogManifestProto mProto = PBHelper.convert(m);
|
||||||
|
RemoteEditLogManifest m1 = PBHelper.convert(mProto);
|
||||||
|
|
||||||
|
List<RemoteEditLog> logs1 = m1.getLogs();
|
||||||
|
assertEquals(logs.size(), logs1.size());
|
||||||
|
for (int i = 0; i < logs.size(); i++) {
|
||||||
|
compare(logs.get(i), logs1.get(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue