diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1c1394ccaca..c66ce76d891 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -5,7 +5,7 @@ Release 0.23-PB - Unreleased NEW FEATURES HDFS-395. DFS Scalability: Incremental block reports. (Tomasz Nykiel - via hairong) + via hairong) HDFS-2517. Add protobuf service for JounralProtocol. (suresh) @@ -15,6 +15,8 @@ Release 0.23-PB - Unreleased HDFS-2519. Add protobuf service for DatanodeProtocol. (suresh) + HDFS-2581. Implement protobuf service for JournalProtocol. (suresh) + IMPROVEMENTS HDFS-2018. Move all journal stream management code into one place. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolPB.java new file mode 100644 index 00000000000..ebbdcb3d5c4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolPB.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.security.KerberosInfo; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.ipc.VersionedProtocol; + +/** + * Protocol used to journal edits to a remote node. Currently, + * this is used to publish edits from the NameNode to a BackupNode. + * + * Note: This extends the protocolbuffer service based interface to + * add annotations required for security. + */ +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, + clientPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY) +@ProtocolInfo(protocolName = + "org.apache.hadoop.hdfs.server.protocol.JournalProtocol", + protocolVersion = 1) +@InterfaceAudience.Private +public interface JournalProtocolPB extends + JournalProtocolService.BlockingInterface, VersionedProtocol { + /** + * This method is defined to get the protocol signature using + * the R23 protocol - hence we have added the suffix of 2 the method name + * to avoid conflict. + */ + public ProtocolSignatureWritable getProtocolSignature2(String protocol, + long clientVersion, int clientMethodsHash) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java new file mode 100644 index 00000000000..389bf154d32 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentResponseProto; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.VersionedProtocol; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * Implementation for protobuf service that forwards requests + * received on {@link JournalProtocolPB} to the + * {@link JournalProtocol} server implementation. + */ +public class JournalProtocolServerSideTranslatorPB implements JournalProtocolPB { + /** Server side implementation to delegate the requests to */ + private final JournalProtocol impl; + + public JournalProtocolServerSideTranslatorPB(JournalProtocol impl) { + this.impl = impl; + } + + /** @see JournalProtocol#journal */ + @Override + public JournalResponseProto journal(RpcController unused, + JournalRequestProto req) throws ServiceException { + try { + impl.journal(PBHelper.convert(req.getRegistration()), + req.getFirstTxnId(), req.getNumTxns(), req.getRecords() + .toByteArray()); + } catch (IOException e) { + throw new ServiceException(e); + } + return JournalResponseProto.newBuilder().build(); + } + + /** @see JournalProtocol#startLogSegment */ + @Override + public StartLogSegmentResponseProto startLogSegment(RpcController controller, + StartLogSegmentRequestProto req) throws ServiceException { + try { + impl.startLogSegment(PBHelper.convert(req.getRegistration()), + req.getTxid()); + } catch (IOException e) { + throw new ServiceException(e); + } + return StartLogSegmentResponseProto.newBuilder().build(); + } + + /** @see VersionedProtocol#getProtocolVersion */ + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return RPC.getProtocolVersion(JournalProtocolPB.class); + } + + /** + * The client side will redirect getProtocolSignature to + * getProtocolSignature2. + * + * However the RPC layer below on the Server side will call getProtocolVersion + * and possibly in the future getProtocolSignature. Hence we still implement + * it even though the end client will never call this method. + * + * @see VersionedProtocol#getProtocolSignature(String, long, int) + */ + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link JournalProtocol} + */ + if (!protocol.equals(RPC.getProtocolName(JournalProtocolPB.class))) { + throw new IOException("Namenode Serverside implements " + + RPC.getProtocolName(JournalProtocolPB.class) + + ". The following requested protocol is unknown: " + protocol); + } + + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + RPC.getProtocolVersion(JournalProtocolPB.class), + JournalProtocolPB.class); + } + + + @Override + public ProtocolSignatureWritable getProtocolSignature2(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link JournalPBProtocol} + */ + return ProtocolSignatureWritable.convert( + this.getProtocolSignature(protocol, clientVersion, clientMethodsHash)); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java new file mode 100644 index 00000000000..adddf9a2f4b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * This class is the client side translator to translate the requests made on + * {@link JournalProtocol} interfaces to the RPC server implementing + * {@link JournalProtocolPB}. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class JournalProtocolTranslatorPB implements JournalProtocol, Closeable { + /** RpcController is not used and hence is set to null */ + private final static RpcController NULL_CONTROLLER = null; + private final JournalProtocolPB rpcProxy; + + public JournalProtocolTranslatorPB(InetSocketAddress nameNodeAddr, + Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, JournalProtocolPB.class, ProtobufRpcEngine.class); + rpcProxy = RPC.getProxy(JournalProtocolPB.class, + JournalProtocol.versionID, nameNodeAddr, conf); + } + + @Override + public void close() { + RPC.stopProxy(rpcProxy); + } + + @Override + public long getProtocolVersion(String protocolName, long clientVersion) + throws IOException { + return 0; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2( + protocol, clientVersion, clientMethodsHash)); + } + + @Override + public void journal(NamenodeRegistration reg, long firstTxnId, + int numTxns, byte[] records) throws IOException { + JournalRequestProto req = JournalRequestProto.newBuilder() + .setRegistration(PBHelper.convert(reg)) + .setFirstTxnId(firstTxnId) + .setNumTxns(numTxns) + .setRecords(PBHelper.getByteString(records)) + .build(); + try { + rpcProxy.journal(NULL_CONTROLLER, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void startLogSegment(NamenodeRegistration registration, long txid) + throws IOException { + StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder() + .setRegistration(PBHelper.convert(registration)) + .setTxid(txid) + .build(); + try { + rpcProxy.startLogSegment(NULL_CONTROLLER, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java new file mode 100644 index 00000000000..598c7fb4163 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; +import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; + +import com.google.protobuf.ByteString; + +/** + * Utilities for converting protobuf classes to and from + * implementation classes. + */ +class PBHelper { + private PBHelper() { + /** Hidden constructor */ + } + + public static ByteString getByteString(byte[] bytes) { + return ByteString.copyFrom(bytes); + } + + public static NamenodeRole convert(NamenodeRoleProto role) { + switch (role) { + case NAMENODE: + return NamenodeRole.NAMENODE; + case BACKUP: + return NamenodeRole.BACKUP; + case CHECKPOINT: + return NamenodeRole.CHECKPOINT; + } + return null; + } + + public static NamenodeRoleProto convert(NamenodeRole role) { + switch (role) { + case NAMENODE: + return NamenodeRoleProto.NAMENODE; + case BACKUP: + return NamenodeRoleProto.BACKUP; + case CHECKPOINT: + return NamenodeRoleProto.CHECKPOINT; + } + return null; + } + + public static StorageInfoProto convert(StorageInfo info) { + return StorageInfoProto.newBuilder().setClusterID(info.getClusterID()) + .setCTime(info.getCTime()) + .setLayoutVersion(info.getLayoutVersion()) + .setNamespceID(info.getNamespaceID()) + .build(); + } + + public static StorageInfo convert(StorageInfoProto info) { + return new StorageInfo(info.getLayoutVersion(), info.getNamespceID(), + info.getClusterID(), info.getCTime()); + } + + + public static NamenodeRegistrationProto convert(NamenodeRegistration reg) { + return NamenodeRegistrationProto.newBuilder() + .setHttpAddress(reg.getHttpAddress()) + .setRole(convert(reg.getRole())) + .setRpcAddress(reg.getAddress()) + .setStorageInfo(convert((StorageInfo) reg)).build(); + } + + public static NamenodeRegistration convert(NamenodeRegistrationProto reg) { + return new NamenodeRegistration(reg.getRpcAddress(), reg.getHttpAddress(), + convert(reg.getStorageInfo()), convert(reg.getRole())); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/overview.html b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/overview.html new file mode 100644 index 00000000000..cf620f379b4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/overview.html @@ -0,0 +1,62 @@ + + + +
++The Protocol Buffers data types for NN protocols that use PB go in this package. +
+-The Protocol Buffers data types for NN protocols that use -PB go in this package. -
- - - diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java new file mode 100644 index 00000000000..85aa91b9140 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import static junit.framework.Assert.*; + +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; +import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.junit.Test; + +/** + * Tests for {@link PBHelper} + */ +public class TestPBHelper { + @Test + public void testConvertNamenodeRole() { + assertEquals(NamenodeRoleProto.BACKUP, + PBHelper.convert(NamenodeRole.BACKUP)); + assertEquals(NamenodeRoleProto.CHECKPOINT, + PBHelper.convert(NamenodeRole.CHECKPOINT)); + assertEquals(NamenodeRoleProto.NAMENODE, + PBHelper.convert(NamenodeRole.NAMENODE)); + assertEquals(NamenodeRole.BACKUP, + PBHelper.convert(NamenodeRoleProto.BACKUP)); + assertEquals(NamenodeRole.CHECKPOINT, + PBHelper.convert(NamenodeRoleProto.CHECKPOINT)); + assertEquals(NamenodeRole.NAMENODE, + PBHelper.convert(NamenodeRoleProto.NAMENODE)); + } + + @Test + public void testConvertStoragInfo() { + StorageInfo info = new StorageInfo(1, 2, "cid", 3); + StorageInfoProto infoProto = PBHelper.convert(info); + StorageInfo info2 = PBHelper.convert(infoProto); + assertEquals(info.getClusterID(), info2.getClusterID()); + assertEquals(info.getCTime(), info2.getCTime()); + assertEquals(info.getLayoutVersion(), info2.getLayoutVersion()); + assertEquals(info.getNamespaceID(), info2.getNamespaceID()); + } + + @Test + public void testConvertNamenodeRegistration() { + StorageInfo info = new StorageInfo(1, 2, "cid", 3); + NamenodeRegistration reg = new NamenodeRegistration("address:999", + "http:1000", info, NamenodeRole.NAMENODE); + NamenodeRegistrationProto regProto = PBHelper.convert(reg); + NamenodeRegistration reg2 = PBHelper.convert(regProto); + assertEquals(reg.getAddress(), reg2.getAddress()); + assertEquals(reg.getClusterID(), reg2.getClusterID()); + assertEquals(reg.getCTime(), reg2.getCTime()); + assertEquals(reg.getHttpAddress(), reg2.getHttpAddress()); + assertEquals(reg.getLayoutVersion(), reg2.getLayoutVersion()); + assertEquals(reg.getNamespaceID(), reg2.getNamespaceID()); + assertEquals(reg.getRegistrationID(), reg2.getRegistrationID()); + assertEquals(reg.getRole(), reg2.getRole()); + assertEquals(reg.getVersion(), reg2.getVersion()); + + } +}