diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c66ce76d891..032b7548608 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -17,6 +17,8 @@ Release 0.23-PB - Unreleased HDFS-2581. Implement protobuf service for JournalProtocol. (suresh) + HDFS-2618. Implement protobuf service for NamenodeProtocol. (suresh) + IMPROVEMENTS HDFS-2018. Move all journal stream management code into one place. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java index adddf9a2f4b..58aff28b70d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java @@ -53,7 +53,7 @@ public class JournalProtocolTranslatorPB implements JournalProtocol, Closeable { Configuration conf) throws IOException { RPC.setProtocolEngine(conf, JournalProtocolPB.class, ProtobufRpcEngine.class); rpcProxy = RPC.getProxy(JournalProtocolPB.class, - JournalProtocol.versionID, nameNodeAddr, conf); + RPC.getProtocolVersion(JournalProtocolPB.class), nameNodeAddr, conf); } @Override @@ -64,7 +64,7 @@ public class JournalProtocolTranslatorPB implements JournalProtocol, Closeable { @Override public long getProtocolVersion(String protocolName, long clientVersion) throws IOException { - return 0; + return rpcProxy.getProtocolVersion(protocolName, clientVersion); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolPB.java new file mode 100644 index 00000000000..6138b44cc91 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolPB.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.security.KerberosInfo; + +/** + * Protocol that a secondary NameNode uses to communicate with the NameNode. + * It's used to get part of the name node state + * + * Note: This extends the protocolbuffer service based interface to + * add annotations required for security. + */ +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, + clientPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY) +@ProtocolInfo(protocolName = + "org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol", + protocolVersion = 1) +@InterfaceAudience.Private +public interface NamenodeProtocolPB extends + NamenodeProtocolService.BlockingInterface, VersionedProtocol { + /** + * This method is defined to get the protocol signature using + * the R23 protocol - hence we have added the suffix of 2 the method name + * to avoid conflict. + */ + public ProtocolSignatureWritable getProtocolSignature2(String protocol, + long clientVersion, int clientMethodsHash) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java new file mode 100644 index 00000000000..5d7fe28e0c7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamespaceInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.VersionRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.VersionResponseProto; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * Implementation for protobuf service that forwards requests + * received on {@link NamenodeProtocolPB} to the + * {@link NamenodeProtocol} server implementation. + */ +public class NamenodeProtocolServerSideTranslatorPB implements + NamenodeProtocolPB { + private final NamenodeProtocol impl; + + public NamenodeProtocolServerSideTranslatorPB(NamenodeProtocol impl) { + this.impl = impl; + } + + @Override + public GetBlocksResponseProto getBlocks(RpcController unused, + GetBlocksRequestProto request) throws ServiceException { + DatanodeInfo dnInfo = new DatanodeInfo(PBHelper.convert(request + .getDatanode())); + BlocksWithLocations blocks; + try { + blocks = impl.getBlocks(dnInfo, request.getSize()); + } catch (IOException e) { + throw new ServiceException(e); + } + return GetBlocksResponseProto.newBuilder() + .setBlocks(PBHelper.convert(blocks)).build(); + } + + @Override + public GetBlockKeysResponseProto getBlockKeys(RpcController unused, + GetBlockKeysRequestProto request) throws ServiceException { + ExportedBlockKeys keys; + try { + keys = impl.getBlockKeys(); + } catch (IOException e) { + throw new ServiceException(e); + } + return GetBlockKeysResponseProto.newBuilder() + .setKeys(PBHelper.convert(keys)).build(); + } + + @Override + public GetTransactionIdResponseProto getTransationId(RpcController unused, + GetTransactionIdRequestProto request) throws ServiceException { + long txid; + try { + txid = impl.getTransactionID(); + } catch (IOException e) { + throw new ServiceException(e); + } + return GetTransactionIdResponseProto.newBuilder().setTxId(txid).build(); + } + + @Override + public RollEditLogResponseProto rollEditLog(RpcController unused, + RollEditLogRequestProto request) throws ServiceException { + CheckpointSignature signature; + try { + signature = impl.rollEditLog(); + } catch (IOException e) { + throw new ServiceException(e); + } + return RollEditLogResponseProto.newBuilder() + .setSignature(PBHelper.convert(signature)).build(); + } + + @Override + public ErrorReportResponseProto errorReport(RpcController unused, + ErrorReportRequestProto request) throws ServiceException { + try { + impl.errorReport(PBHelper.convert(request.getRegistration()), + request.getErrorCode(), request.getMsg()); + } catch (IOException e) { + throw new ServiceException(e); + } + return ErrorReportResponseProto.newBuilder().build(); + } + + @Override + public RegisterResponseProto register(RpcController unused, + RegisterRequestProto request) throws ServiceException { + NamenodeRegistration reg; + try { + reg = impl.register(PBHelper.convert(request.getRegistration())); + } catch (IOException e) { + throw new ServiceException(e); + } + return RegisterResponseProto.newBuilder() + .setRegistration(PBHelper.convert(reg)).build(); + } + + @Override + public StartCheckpointResponseProto startCheckpoint(RpcController unused, + StartCheckpointRequestProto request) throws ServiceException { + NamenodeCommand cmd; + try { + cmd = impl.startCheckpoint(PBHelper.convert(request.getRegistration())); + } catch (IOException e) { + throw new ServiceException(e); + } + return StartCheckpointResponseProto.newBuilder() + .setCommand(PBHelper.convert(cmd)).build(); + } + + @Override + public EndCheckpointResponseProto endCheckpoint(RpcController unused, + EndCheckpointRequestProto request) throws ServiceException { + try { + impl.endCheckpoint(PBHelper.convert(request.getRegistration()), + PBHelper.convert(request.getSignature())); + } catch (IOException e) { + throw new ServiceException(e); + } + return EndCheckpointResponseProto.newBuilder().build(); + } + + @Override + public GetEditLogManifestResponseProto getEditLogManifest( + RpcController unused, GetEditLogManifestRequestProto request) + throws ServiceException { + RemoteEditLogManifest manifest; + try { + manifest = impl.getEditLogManifest(request.getSinceTxId()); + } catch (IOException e) { + throw new ServiceException(e); + } + return GetEditLogManifestResponseProto.newBuilder() + .setManifest(PBHelper.convert(manifest)).build(); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return RPC.getProtocolVersion(NamenodeProtocolPB.class); + } + + /** + * The client side will redirect getProtocolSignature to + * getProtocolSignature2. + * + * However the RPC layer below on the Server side will call getProtocolVersion + * and possibly in the future getProtocolSignature. Hence we still implement + * it even though the end client will never call this method. + */ + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link NamenodeProtocol} + */ + if (!protocol.equals(RPC.getProtocolName(NamenodeProtocolPB.class))) { + throw new IOException("Namenode Serverside implements " + + RPC.getProtocolName(NamenodeProtocolPB.class) + + ". The following requested protocol is unknown: " + protocol); + } + + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + RPC.getProtocolVersion(NamenodeProtocolPB.class), + NamenodeProtocolPB.class); + } + + + @Override + public ProtocolSignatureWritable getProtocolSignature2(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link NamenodePBProtocol} + */ + return ProtocolSignatureWritable.convert( + this.getProtocolSignature(protocol, clientVersion, clientMethodsHash)); + } + + @Override + public VersionResponseProto versionRequest(RpcController controller, + VersionRequestProto request) throws ServiceException { + NamespaceInfo info; + try { + info = impl.versionRequest(); + } catch (IOException e) { + throw new ServiceException(e); + } + return VersionResponseProto.newBuilder() + .setInfo(convert(info)).build(); + } + + private NamespaceInfoProto convert(NamespaceInfo info) { + return NamespaceInfoProto.newBuilder() + .setBlockPoolID(info.getBlockPoolID()) + .setBuildVersion(info.getBuildVersion()) + .setDistUpgradeVersion(info.getDistributedUpgradeVersion()) + .setStorageInfo(PBHelper.convert(info)).build(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java new file mode 100644 index 00000000000..86e0d9c81eb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.VersionRequestProto; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * This class is the client side translator to translate the requests made on + * {@link NamenodeProtocol} interfaces to the RPC server implementing + * {@link NamenodeProtocolPB}. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class NamenodeProtocolTranslatorPB implements NamenodeProtocol, + Closeable { + /** RpcController is not used and hence is set to null */ + private final static RpcController NULL_CONTROLLER = null; + + /* + * Protobuf requests with no parameters instantiated only once + */ + private static final GetBlockKeysRequestProto GET_BLOCKKEYS = + GetBlockKeysRequestProto.newBuilder().build(); + private static final GetTransactionIdRequestProto GET_TRANSACTIONID = + GetTransactionIdRequestProto.newBuilder().build(); + private static final RollEditLogRequestProto ROLL_EDITLOG = + RollEditLogRequestProto.newBuilder().build(); + private static final VersionRequestProto VERSION_REQUEST = + VersionRequestProto.newBuilder().build(); + + final private NamenodeProtocolPB rpcProxy; + + + + private static NamenodeProtocolPB createNamenode( + InetSocketAddress nameNodeAddr, Configuration conf, + UserGroupInformation ugi) throws IOException { + return RPC.getProxy(NamenodeProtocolPB.class, + RPC.getProtocolVersion(NamenodeProtocolPB.class), nameNodeAddr, ugi, + conf, NetUtils.getSocketFactory(conf, NamenodeProtocolPB.class)); + } + + /** Create a {@link NameNode} proxy */ + static NamenodeProtocolPB createNamenodeWithRetry( + NamenodeProtocolPB rpcNamenode) { + RetryPolicy createPolicy = RetryPolicies + .retryUpToMaximumCountWithFixedSleep(5, + HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS); + Map, RetryPolicy> remoteExceptionToPolicyMap = new HashMap, RetryPolicy>(); + remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, + createPolicy); + + Map, RetryPolicy> exceptionToPolicyMap = new HashMap, RetryPolicy>(); + exceptionToPolicyMap.put(RemoteException.class, RetryPolicies + .retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL, + remoteExceptionToPolicyMap)); + RetryPolicy methodPolicy = RetryPolicies.retryByException( + RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); + Map methodNameToPolicyMap = new HashMap(); + + methodNameToPolicyMap.put("create", methodPolicy); + + return (NamenodeProtocolPB) RetryProxy.create(NamenodeProtocolPB.class, + rpcNamenode, methodNameToPolicyMap); + } + + public NamenodeProtocolTranslatorPB(InetSocketAddress nameNodeAddr, + Configuration conf, UserGroupInformation ugi) throws IOException { + rpcProxy = createNamenodeWithRetry(createNamenode(nameNodeAddr, conf, ugi)); + } + + public void close() { + RPC.stopProxy(rpcProxy); + } + + @Override + public ProtocolSignature getProtocolSignature(String protocolName, + long clientVersion, int clientMethodHash) throws IOException { + return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2( + protocolName, clientVersion, clientMethodHash)); + } + + @Override + public long getProtocolVersion(String protocolName, long clientVersion) + throws IOException { + return rpcProxy.getProtocolVersion(protocolName, clientVersion); + } + + @Override + public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) + throws IOException { + GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder() + .setDatanode(PBHelper.convert(datanode)).setSize(size) + .build(); + try { + return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req) + .getBlocks()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public ExportedBlockKeys getBlockKeys() throws IOException { + try { + return PBHelper.convert(rpcProxy.getBlockKeys(NULL_CONTROLLER, + GET_BLOCKKEYS).getKeys()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public long getTransactionID() throws IOException { + try { + return rpcProxy.getTransationId(NULL_CONTROLLER, GET_TRANSACTIONID) + .getTxId(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + @SuppressWarnings("deprecation") + public CheckpointSignature rollEditLog() throws IOException { + try { + return PBHelper.convert(rpcProxy.rollEditLog(NULL_CONTROLLER, + ROLL_EDITLOG).getSignature()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public NamespaceInfo versionRequest() throws IOException { + try { + return PBHelper.convert(rpcProxy.versionRequest(NULL_CONTROLLER, + VERSION_REQUEST).getInfo()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void errorReport(NamenodeRegistration registration, int errorCode, + String msg) throws IOException { + ErrorReportRequestProto req = ErrorReportRequestProto.newBuilder() + .setErrorCode(errorCode).setMsg(msg) + .setRegistration(PBHelper.convert(registration)).build(); + try { + rpcProxy.errorReport(NULL_CONTROLLER, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public NamenodeRegistration register(NamenodeRegistration registration) + throws IOException { + RegisterRequestProto req = RegisterRequestProto.newBuilder() + .setRegistration(PBHelper.convert(registration)).build(); + try { + return PBHelper.convert(rpcProxy.register(NULL_CONTROLLER, req) + .getRegistration()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public NamenodeCommand startCheckpoint(NamenodeRegistration registration) + throws IOException { + StartCheckpointRequestProto req = StartCheckpointRequestProto.newBuilder() + .setRegistration(PBHelper.convert(registration)).build(); + NamenodeCommandProto cmd; + try { + cmd = rpcProxy.startCheckpoint(NULL_CONTROLLER, req).getCommand(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + return PBHelper.convert(cmd); + } + + @Override + public void endCheckpoint(NamenodeRegistration registration, + CheckpointSignature sig) throws IOException { + EndCheckpointRequestProto req = EndCheckpointRequestProto.newBuilder() + .setRegistration(PBHelper.convert(registration)) + .setSignature(PBHelper.convert(sig)).build(); + try { + rpcProxy.endCheckpoint(NULL_CONTROLLER, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public RemoteEditLogManifest getEditLogManifest(long sinceTxId) + throws IOException { + GetEditLogManifestRequestProto req = GetEditLogManifestRequestProto + .newBuilder().setSinceTxId(sinceTxId).build(); + try { + return PBHelper.convert(rpcProxy.getEditLogManifest(NULL_CONTROLLER, req) + .getManifest()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 598c7fb4163..9e5ae94cfb6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -17,28 +17,55 @@ */ package org.apache.hadoop.hdfs.protocolPB; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamespaceInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogManifestProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; +import org.apache.hadoop.hdfs.security.token.block.BlockKey; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; +import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import com.google.protobuf.ByteString; /** - * Utilities for converting protobuf classes to and from - * implementation classes. + * Utilities for converting protobuf classes to and from implementation classes. */ class PBHelper { private PBHelper() { /** Hidden constructor */ } - + public static ByteString getByteString(byte[] bytes) { return ByteString.copyFrom(bytes); } - + public static NamenodeRole convert(NamenodeRoleProto role) { switch (role) { case NAMENODE: @@ -50,7 +77,7 @@ class PBHelper { } return null; } - + public static NamenodeRoleProto convert(NamenodeRole role) { switch (role) { case NAMENODE: @@ -62,31 +89,196 @@ class PBHelper { } return null; } - + public static StorageInfoProto convert(StorageInfo info) { return StorageInfoProto.newBuilder().setClusterID(info.getClusterID()) - .setCTime(info.getCTime()) - .setLayoutVersion(info.getLayoutVersion()) - .setNamespceID(info.getNamespaceID()) - .build(); + .setCTime(info.getCTime()).setLayoutVersion(info.getLayoutVersion()) + .setNamespceID(info.getNamespaceID()).build(); } - + public static StorageInfo convert(StorageInfoProto info) { return new StorageInfo(info.getLayoutVersion(), info.getNamespceID(), info.getClusterID(), info.getCTime()); } - - + public static NamenodeRegistrationProto convert(NamenodeRegistration reg) { return NamenodeRegistrationProto.newBuilder() - .setHttpAddress(reg.getHttpAddress()) - .setRole(convert(reg.getRole())) + .setHttpAddress(reg.getHttpAddress()).setRole(convert(reg.getRole())) .setRpcAddress(reg.getAddress()) .setStorageInfo(convert((StorageInfo) reg)).build(); } - + public static NamenodeRegistration convert(NamenodeRegistrationProto reg) { return new NamenodeRegistration(reg.getRpcAddress(), reg.getHttpAddress(), convert(reg.getStorageInfo()), convert(reg.getRole())); } -} + + public static DatanodeID convert(DatanodeIDProto dn) { + return new DatanodeID(dn.getName(), dn.getStorageID(), dn.getInfoPort(), + dn.getIpcPort()); + } + + public static DatanodeIDProto convert(DatanodeID dn) { + return DatanodeIDProto.newBuilder().setName(dn.getName()) + .setInfoPort(dn.getInfoPort()).setIpcPort(dn.getIpcPort()) + .setStorageID(dn.getStorageID()).build(); + } + + public static BlockProto convert(Block b) { + return BlockProto.newBuilder().setBlockId(b.getBlockId()) + .setGenStamp(b.getGenerationStamp()).setNumBytes(b.getNumBytes()) + .build(); + } + + public static Block convert(BlockProto b) { + return new Block(b.getBlockId(), b.getGenStamp(), b.getNumBytes()); + } + + public static BlockWithLocationsProto convert(BlockWithLocations blk) { + return BlockWithLocationsProto.newBuilder() + .setBlock(convert(blk.getBlock())) + .addAllDatanodeIDs(Arrays.asList(blk.getDatanodes())).build(); + } + + public static BlockWithLocations convert(BlockWithLocationsProto b) { + return new BlockWithLocations(convert(b.getBlock()), b.getDatanodeIDsList() + .toArray(new String[0])); + } + + public static BlocksWithLocationsProto convert(BlocksWithLocations blks) { + BlocksWithLocationsProto.Builder builder = BlocksWithLocationsProto + .newBuilder(); + for (BlockWithLocations b : blks.getBlocks()) { + builder.addBlocks(convert(b)); + } + return builder.build(); + } + + public static BlocksWithLocations convert(BlocksWithLocationsProto blocks) { + return new BlocksWithLocations(convert(blocks.getBlocksList())); + } + + public static BlockKeyProto convert(BlockKey key) { + byte[] encodedKey = key.getEncodedKey(); + ByteString keyBytes = ByteString.copyFrom(encodedKey == null ? new byte[0] + : encodedKey); + return BlockKeyProto.newBuilder().setKeyId(key.getKeyId()) + .setKeyBytes(keyBytes).setExpiryDate(key.getExpiryDate()).build(); + } + + public static BlockKey convert(BlockKeyProto k) { + return new BlockKey(k.getKeyId(), k.getExpiryDate(), k.getKeyBytes() + .toByteArray()); + } + + public static ExportedBlockKeysProto convert(ExportedBlockKeys keys) { + ExportedBlockKeysProto.Builder builder = ExportedBlockKeysProto + .newBuilder(); + builder.setIsBlockTokenEnabled(keys.isBlockTokenEnabled()) + .setKeyUpdateInterval(keys.getKeyUpdateInterval()) + .setTokenLifeTime(keys.getTokenLifetime()) + .setCurrentKey(convert(keys.getCurrentKey())); + for (BlockKey k : keys.getAllKeys()) { + builder.addAllKeys(convert(k)); + } + return builder.build(); + } + + public static ExportedBlockKeys convert(ExportedBlockKeysProto keys) { + return new ExportedBlockKeys(keys.getIsBlockTokenEnabled(), + keys.getKeyUpdateInterval(), keys.getTokenLifeTime(), + convert(keys.getCurrentKey()), convertBlockKeys(keys.getAllKeysList())); + } + + public static CheckpointSignatureProto convert(CheckpointSignature s) { + return CheckpointSignatureProto.newBuilder() + .setBlockPoolId(s.getBlockpoolID()) + .setCurSegmentTxId(s.getCurSegmentTxId()) + .setMostRecentCheckpointTxId(s.getMostRecentCheckpointTxId()) + .setStorageInfo(PBHelper.convert((StorageInfo) s)).build(); + } + + public static CheckpointSignature convert(CheckpointSignatureProto s) { + return new CheckpointSignature(PBHelper.convert(s.getStorageInfo()), + s.getBlockPoolId(), s.getMostRecentCheckpointTxId(), + s.getCurSegmentTxId()); + } + + public static RemoteEditLogProto convert(RemoteEditLog log) { + return RemoteEditLogProto.newBuilder().setEndTxId(log.getEndTxId()) + .setStartTxId(log.getStartTxId()).build(); + } + + public static RemoteEditLog convert(RemoteEditLogProto l) { + return new RemoteEditLog(l.getStartTxId(), l.getEndTxId()); + } + + public static RemoteEditLogManifestProto convert( + RemoteEditLogManifest manifest) { + RemoteEditLogManifestProto.Builder builder = RemoteEditLogManifestProto + .newBuilder(); + for (RemoteEditLog log : manifest.getLogs()) { + builder.addLogs(convert(log)); + } + return builder.build(); + } + + public static RemoteEditLogManifest convert( + RemoteEditLogManifestProto manifest) { + List logs = new ArrayList(manifest + .getLogsList().size()); + for (RemoteEditLogProto l : manifest.getLogsList()) { + logs.add(convert(l)); + } + return new RemoteEditLogManifest(logs); + } + + public static CheckpointCommandProto convert(CheckpointCommand cmd) { + return CheckpointCommandProto.newBuilder() + .setSignature(convert(cmd.getSignature())).build(); + } + + public static NamenodeCommandProto convert(NamenodeCommand cmd) { + if (cmd instanceof CheckpointCommand) { + return NamenodeCommandProto.newBuilder().setAction(cmd.getAction()) + .setType(NamenodeCommandProto.Type.NamenodeCommand) + .setCheckpointCmd(convert((CheckpointCommand) cmd)).build(); + } + return NamenodeCommandProto.newBuilder().setAction(cmd.getAction()).build(); + } + + public static BlockWithLocations[] convert(List b) { + BlockWithLocations[] ret = new BlockWithLocations[b.size()]; + int i = 0; + for (BlockWithLocationsProto entry : b) { + ret[i++] = convert(entry); + } + return ret; + } + + public static BlockKey[] convertBlockKeys(List list) { + BlockKey[] ret = new BlockKey[list.size()]; + int i = 0; + for (BlockKeyProto k : list) { + ret[i++] = convert(k); + } + return ret; + } + + public static NamespaceInfo convert(NamespaceInfoProto info) { + StorageInfoProto storage = info.getStorageInfo(); + return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(), + info.getBlockPoolID(), storage.getCTime(), info.getDistUpgradeVersion()); + } + + public static NamenodeCommand convert(NamenodeCommandProto cmd) { + switch (cmd.getType()) { + case CheckPointCommand: + CheckpointCommandProto chkPt = cmd.getCheckpointCmd(); + return new CheckpointCommand(PBHelper.convert(chkPt.getSignature()), + chkPt.getNeedToReturnImage()); + default: + return new NamenodeCommand(cmd.getAction()); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java index 004ff1056a4..8a630bab9c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java @@ -36,4 +36,8 @@ public class BlockKey extends DelegationKey { public BlockKey(int keyId, long expiryDate, SecretKey key) { super(keyId, expiryDate, key); } + + public BlockKey(int keyId, long expiryDate, byte[] encodedKey) { + super(keyId, expiryDate, encodedKey); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto index 27fa6d19952..25f8722da79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto @@ -42,7 +42,7 @@ message GetBlocksRequestProto { * blocks - List of returned blocks */ message GetBlocksResponseProto { - required BlockWithLocationsProto blocks = 1; // List of blocks + required BlocksWithLocationsProto blocks = 1; // List of blocks } /** @@ -85,12 +85,25 @@ message RollEditLogResponseProto { } /** - * registartion - Namenode reporting the error + * void request + */ +message VersionRequestProto { +} + +/** + * void request + */ +message VersionResponseProto { + required NamespaceInfoProto info = 1; +} + +/** + * registration - Namenode reporting the error * errorCode - error code indicating the error * msg - Free text description of the error */ message ErrorReportRequestProto { - required NamenodeRegistrationProto registartion = 1; // Registartion info + required NamenodeRegistrationProto registration = 1; // Registration info required uint32 errorCode = 2; // Error code required string msg = 3; // Error message } @@ -193,6 +206,11 @@ service NamenodeProtocolService { */ rpc rollEditLog(RollEditLogRequestProto) returns(RollEditLogResponseProto); + /** + * Close the current editlog and open a new one for checkpointing purposes + */ + rpc versionRequest(VersionRequestProto) returns(VersionResponseProto); + /** * Report from a sub-ordinate namenode of an error to the active namenode. * Active namenode may decide to unregister the reporting namenode diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto index 8b86066980d..3e8431829b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto @@ -270,8 +270,8 @@ message BlockProto { * Block and datanodes where is it located */ message BlockWithLocationsProto { - required BlockProto block = 1; // Block - repeated DatanodeIDProto datanodeIDs = 2; // Datanodes with replicas of the block + required BlockProto block = 1; // Block + repeated string datanodeIDs = 2; // Datanodes with replicas of the block } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index 85aa91b9140..ddf0ad07233 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -19,12 +19,35 @@ package org.apache.hadoop.hdfs.protocolPB; import static junit.framework.Assert.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogManifestProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; +import org.apache.hadoop.hdfs.security.token.block.BlockKey; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.junit.Test; /** @@ -46,10 +69,14 @@ public class TestPBHelper { assertEquals(NamenodeRole.NAMENODE, PBHelper.convert(NamenodeRoleProto.NAMENODE)); } - + + private static StorageInfo getStorageInfo() { + return new StorageInfo(1, 2, "cid", 3); + } + @Test public void testConvertStoragInfo() { - StorageInfo info = new StorageInfo(1, 2, "cid", 3); + StorageInfo info = getStorageInfo(); StorageInfoProto infoProto = PBHelper.convert(info); StorageInfo info2 = PBHelper.convert(infoProto); assertEquals(info.getClusterID(), info2.getClusterID()); @@ -57,10 +84,10 @@ public class TestPBHelper { assertEquals(info.getLayoutVersion(), info2.getLayoutVersion()); assertEquals(info.getNamespaceID(), info2.getNamespaceID()); } - + @Test public void testConvertNamenodeRegistration() { - StorageInfo info = new StorageInfo(1, 2, "cid", 3); + StorageInfo info = getStorageInfo(); NamenodeRegistration reg = new NamenodeRegistration("address:999", "http:1000", info, NamenodeRole.NAMENODE); NamenodeRegistrationProto regProto = PBHelper.convert(reg); @@ -74,6 +101,144 @@ public class TestPBHelper { assertEquals(reg.getRegistrationID(), reg2.getRegistrationID()); assertEquals(reg.getRole(), reg2.getRole()); assertEquals(reg.getVersion(), reg2.getVersion()); + + } + + @Test + public void testConvertDatanodeID() { + DatanodeID dn = new DatanodeID("node", "sid", 1, 2); + DatanodeIDProto dnProto = PBHelper.convert(dn); + DatanodeID dn2 = PBHelper.convert(dnProto); + assertEquals(dn.getHost(), dn2.getHost()); + assertEquals(dn.getInfoPort(), dn2.getInfoPort()); + assertEquals(dn.getIpcPort(), dn2.getIpcPort()); + assertEquals(dn.getName(), dn2.getName()); + assertEquals(dn.getPort(), dn2.getPort()); + assertEquals(dn.getStorageID(), dn2.getStorageID()); + } + + @Test + public void testConvertBlock() { + Block b = new Block(1, 100, 3); + BlockProto bProto = PBHelper.convert(b); + Block b2 = PBHelper.convert(bProto); + assertEquals(b, b2); + } + + private static BlockWithLocations getBlockWithLocations(int bid) { + return new BlockWithLocations(new Block(bid, 0, 1), new String[] { "dn1", + "dn2", "dn3" }); + } + + private void compare(BlockWithLocations locs1, BlockWithLocations locs2) { + assertEquals(locs1.getBlock(), locs2.getBlock()); + assertTrue(Arrays.equals(locs1.getDatanodes(), locs2.getDatanodes())); + } + + @Test + public void testConvertBlockWithLocations() { + BlockWithLocations locs = getBlockWithLocations(1); + BlockWithLocationsProto locsProto = PBHelper.convert(locs); + BlockWithLocations locs2 = PBHelper.convert(locsProto); + compare(locs, locs2); + } + + @Test + public void testConvertBlocksWithLocations() { + BlockWithLocations[] list = new BlockWithLocations[] { + getBlockWithLocations(1), getBlockWithLocations(2) }; + BlocksWithLocations locs = new BlocksWithLocations(list); + BlocksWithLocationsProto locsProto = PBHelper.convert(locs); + BlocksWithLocations locs2 = PBHelper.convert(locsProto); + BlockWithLocations[] blocks = locs.getBlocks(); + BlockWithLocations[] blocks2 = locs2.getBlocks(); + assertEquals(blocks.length, blocks2.length); + for (int i = 0; i < blocks.length; i++) { + compare(blocks[i], blocks2[i]); + } + } + + private static BlockKey getBlockKey(int keyId) { + return new BlockKey(keyId, 10, "encodedKey".getBytes()); + } + + private void compare(BlockKey k1, BlockKey k2) { + assertEquals(k1.getExpiryDate(), k2.getExpiryDate()); + assertEquals(k1.getKeyId(), k2.getKeyId()); + assertTrue(Arrays.equals(k1.getEncodedKey(), k2.getEncodedKey())); + + } + + @Test + public void testConvertBlockKey() { + BlockKey key = getBlockKey(1); + BlockKeyProto keyProto = PBHelper.convert(key); + BlockKey key1 = PBHelper.convert(keyProto); + compare(key, key1); + } + + @Test + public void testConvertExportedBlockKeys() { + BlockKey[] keys = new BlockKey[] { getBlockKey(2), getBlockKey(3) }; + ExportedBlockKeys expKeys = new ExportedBlockKeys(true, 9, 10, + getBlockKey(1), keys); + ExportedBlockKeysProto expKeysProto = PBHelper.convert(expKeys); + ExportedBlockKeys expKeys1 = PBHelper.convert(expKeysProto); + + BlockKey[] allKeys = expKeys.getAllKeys(); + BlockKey[] allKeys1 = expKeys1.getAllKeys(); + assertEquals(allKeys.length, allKeys1.length); + for (int i = 0; i < allKeys.length; i++) { + compare(allKeys[i], allKeys1[i]); + } + compare(expKeys.getCurrentKey(), expKeys1.getCurrentKey()); + assertEquals(expKeys.getKeyUpdateInterval(), + expKeys1.getKeyUpdateInterval()); + assertEquals(expKeys.getTokenLifetime(), expKeys1.getTokenLifetime()); + } + + @Test + public void testConvertCheckpointSignature() { + CheckpointSignature s = new CheckpointSignature(getStorageInfo(), "bpid", + 100, 1); + CheckpointSignatureProto sProto = PBHelper.convert(s); + CheckpointSignature s1 = PBHelper.convert(sProto); + assertEquals(s.getBlockpoolID(), s1.getBlockpoolID()); + assertEquals(s.getClusterID(), s1.getClusterID()); + assertEquals(s.getCTime(), s1.getCTime()); + assertEquals(s.getCurSegmentTxId(), s1.getCurSegmentTxId()); + assertEquals(s.getLayoutVersion(), s1.getLayoutVersion()); + assertEquals(s.getMostRecentCheckpointTxId(), + s1.getMostRecentCheckpointTxId()); + assertEquals(s.getNamespaceID(), s1.getNamespaceID()); + } + + private static void compare(RemoteEditLog l1, RemoteEditLog l2) { + assertEquals(l1.getEndTxId(), l2.getEndTxId()); + assertEquals(l1.getStartTxId(), l2.getStartTxId()); + } + + @Test + public void testConvertRemoteEditLog() { + RemoteEditLog l = new RemoteEditLog(1, 100); + RemoteEditLogProto lProto = PBHelper.convert(l); + RemoteEditLog l1 = PBHelper.convert(lProto); + compare(l, l1); + } + + @Test + public void testConvertRemoteEditLogManifest() { + List logs = new ArrayList(); + logs.add(new RemoteEditLog(1, 10)); + logs.add(new RemoteEditLog(11, 20)); + RemoteEditLogManifest m = new RemoteEditLogManifest(logs); + RemoteEditLogManifestProto mProto = PBHelper.convert(m); + RemoteEditLogManifest m1 = PBHelper.convert(mProto); + List logs1 = m1.getLogs(); + assertEquals(logs.size(), logs1.size()); + for (int i = 0; i < logs.size(); i++) { + compare(logs.get(i), logs1.get(i)); + } } }