diff --git a/hadoop-common-project/hadoop-common/src/main/conf/hadoop-policy.xml b/hadoop-common-project/hadoop-common/src/main/conf/hadoop-policy.xml
index d282c5841c1..cf3dd1f4ec3 100644
--- a/hadoop-common-project/hadoop-common/src/main/conf/hadoop-policy.xml
+++ b/hadoop-common-project/hadoop-common/src/main/conf/hadoop-policy.xml
@@ -123,6 +123,14 @@
JNs when using the QuorumJournalManager for edit logs.
+
+ security.interqjournal.service.protocol.acl
+ *
+ ACL for InterQJournalProtocol, used by the JN to
+ communicate with other JN
+
+
+
security.mrhs.client.protocol.acl
*
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 0da4bbd0862..ed15fa43eab 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -224,6 +224,8 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
SECURITY_NAMENODE_PROTOCOL_ACL = "security.namenode.protocol.acl";
public static final String SECURITY_QJOURNAL_SERVICE_PROTOCOL_ACL =
"security.qjournal.service.protocol.acl";
+ public static final String SECURITY_INTERQJOURNAL_SERVICE_PROTOCOL_ACL =
+ "security.interqjournal.service.protocol.acl";
public static final String HADOOP_SECURITY_TOKEN_SERVICE_USE_IP =
"hadoop.security.token.service.use_ip";
public static final boolean HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT =
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
index 084c594d111..a9622bae7c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
@@ -1,3 +1,4 @@
+
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index fe51071e3aa..2094f23cf44 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -346,6 +346,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
FederationProtocol.proto
RouterProtocol.proto
AliasMapProtocol.proto
+ InterQJournalProtocol.proto
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
index edc72c5e8f8..e9993757753 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.ha.ZKFCProtocol;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeLifelineProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -57,6 +58,9 @@ public class HDFSPolicyProvider extends PolicyProvider {
NamenodeProtocol.class),
new Service(CommonConfigurationKeys.SECURITY_QJOURNAL_SERVICE_PROTOCOL_ACL,
QJournalProtocol.class),
+ new Service(
+ CommonConfigurationKeys.SECURITY_INTERQJOURNAL_SERVICE_PROTOCOL_ACL,
+ InterQJournalProtocol.class),
new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL,
HAServiceProtocol.class),
new Service(CommonConfigurationKeys.SECURITY_ZKFC_PROTOCOL_ACL,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java
new file mode 100644
index 00000000000..94caeba7888
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.qjournal.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalResponseProto;
+import org.apache.hadoop.security.KerberosInfo;
+
+import java.io.IOException;
+
+/**
+ * Protocol used to communicate between {@link JournalNode} for journalsync.
+ *
+ * This is responsible for sending edit log manifest.
+ */
+
+@KerberosInfo(
+ serverPrincipal = DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY,
+ clientPrincipal = DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY)
+@InterfaceAudience.Private
+public interface InterQJournalProtocol {
+
+ long versionID = 1L;
+
+ /**
+ * @param jid the journal from which to enumerate edits
+ * @param sinceTxId the first transaction which the client cares about
+ * @param inProgressOk whether or not to check the in-progress edit log
+ * segment
+ * @return a list of edit log segments since the given transaction ID.
+ */
+ GetEditLogManifestFromJournalResponseProto getEditLogManifestFromJournal(
+ String jid, String nameServiceId, long sinceTxId, boolean inProgressOk)
+ throws IOException;
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolPB.java
new file mode 100644
index 00000000000..5fdb8eeb11f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolPB.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.qjournal.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.InterQJournalProtocolService;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+/**
+ * Protocol used to communicate between journal nodes for journal sync.
+ * Note: This extends the protocolbuffer service based interface to
+ * add annotations required for security.
+ */
+@KerberosInfo(
+ serverPrincipal = DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY,
+ clientPrincipal = DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY)
+@ProtocolInfo(protocolName =
+ "org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol",
+ protocolVersion = 1)
+@InterfaceAudience.Private
+public interface InterQJournalProtocolPB extends
+ InterQJournalProtocolService.BlockingInterface {
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java
new file mode 100644
index 00000000000..15d63879264
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hdfs.qjournal.protocolPB;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalResponseProto;
+
+import java.io.IOException;
+
+/**
+ * Implementation for protobuf service that forwards requests
+ * received on {@link InterQJournalProtocolPB} to the
+ * {@link InterQJournalProtocol} server implementation.
+ */
+@InterfaceAudience.Private
+public class InterQJournalProtocolServerSideTranslatorPB implements
+ InterQJournalProtocolPB{
+
+ /* Server side implementation to delegate the requests to. */
+ private final InterQJournalProtocol impl;
+
+ public InterQJournalProtocolServerSideTranslatorPB(InterQJournalProtocol
+ impl) {
+ this.impl = impl;
+ }
+
+ @Override
+ public GetEditLogManifestFromJournalResponseProto
+ getEditLogManifestFromJournal(RpcController controller,
+ GetEditLogManifestFromJournalRequestProto
+ request) throws ServiceException {
+ try {
+ return impl.getEditLogManifestFromJournal(
+ request.getJid().getIdentifier(),
+ request.hasNameServiceId() ? request.getNameServiceId() : null,
+ request.getSinceTxId(),
+ request.getInProgressOk());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java
new file mode 100644
index 00000000000..cdccfca5e87
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hdfs.qjournal.protocolPB;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcClientUtil;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * This class is the client side translator to translate the requests made on
+ * {@link InterQJournalProtocol} interfaces to the RPC server implementing
+ * {@link InterQJournalProtocolPB}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class InterQJournalProtocolTranslatorPB implements ProtocolMetaInterface,
+ InterQJournalProtocol, Closeable {
+
+ /* RpcController is not used and hence is set to null. */
+ private final static RpcController NULL_CONTROLLER = null;
+ private final InterQJournalProtocolPB rpcProxy;
+
+ public InterQJournalProtocolTranslatorPB(InterQJournalProtocolPB rpcProxy) {
+ this.rpcProxy = rpcProxy;
+ }
+
+ @Override
+ public void close() {
+ RPC.stopProxy(rpcProxy);
+ }
+
+
+ @Override
+ public GetEditLogManifestFromJournalResponseProto
+ getEditLogManifestFromJournal(String jid, String nameServiceId,
+ long sinceTxId, boolean inProgressOk)
+ throws IOException {
+ try {
+ GetEditLogManifestFromJournalRequestProto.Builder req;
+ req = GetEditLogManifestFromJournalRequestProto.newBuilder()
+ .setJid(convertJournalId(jid))
+ .setSinceTxId(sinceTxId)
+ .setInProgressOk(inProgressOk);
+ if (nameServiceId !=null) {
+ req.setNameServiceId(nameServiceId);
+ }
+ return rpcProxy.getEditLogManifestFromJournal(NULL_CONTROLLER,
+ req.build()
+ );
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ private QJournalProtocolProtos.JournalIdProto convertJournalId(String jid) {
+ return QJournalProtocolProtos.JournalIdProto.newBuilder()
+ .setIdentifier(jid)
+ .build();
+ }
+
+ @Override
+ public boolean isMethodSupported(String methodName) throws IOException {
+ return RpcClientUtil.isMethodSupported(rpcProxy,
+ InterQJournalProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+ RPC.getProtocolVersion(InterQJournalProtocolPB.class), methodName);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
index 748a51c65cd..6cb933b5e4f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
@@ -26,8 +26,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.InterQJournalProtocolService;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
@@ -36,6 +40,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRe
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.QJournalProtocolService;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolPB;
+import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
@@ -52,7 +58,8 @@ import com.google.protobuf.BlockingService;
@InterfaceAudience.Private
@VisibleForTesting
-public class JournalNodeRpcServer implements QJournalProtocol {
+public class JournalNodeRpcServer implements QJournalProtocol,
+ InterQJournalProtocol {
private static final int HANDLER_COUNT = 5;
private final JournalNode jn;
private Server server;
@@ -84,6 +91,19 @@ public class JournalNodeRpcServer implements QJournalProtocol {
.setVerbose(false)
.build();
+
+ //Adding InterQJournalProtocolPB to server
+ InterQJournalProtocolServerSideTranslatorPB
+ qJournalProtocolServerSideTranslatorPB = new
+ InterQJournalProtocolServerSideTranslatorPB(this);
+
+ BlockingService interQJournalProtocolService = InterQJournalProtocolService
+ .newReflectiveBlockingService(qJournalProtocolServerSideTranslatorPB);
+
+ DFSUtil.addPBProtocol(confCopy, InterQJournalProtocolPB.class,
+ interQJournalProtocolService, server);
+
+
// set service-level authorization security policy
if (confCopy.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
@@ -263,4 +283,20 @@ public class JournalNodeRpcServer implements QJournalProtocol {
String nameServiceId) throws IOException {
return jn.getJournalCTime(journalId, nameServiceId);
}
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public GetEditLogManifestFromJournalResponseProto
+ getEditLogManifestFromJournal(String jid, String nameServiceId,
+ long sinceTxId, boolean inProgressOk)
+ throws IOException {
+ RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid, nameServiceId)
+ .getEditLogManifest(sinceTxId, inProgressOk);
+
+ return GetEditLogManifestFromJournalResponseProto.newBuilder()
+ .setManifest(PBHelper.convert(manifest))
+ .setHttpPort(jn.getBoundHttpAddress().getPort())
+ .setFromURL(jn.getHttpServerURI())
+ .build();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
index 490b3ea452a..fa47b143cce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.qjournal.server;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
@@ -28,19 +27,17 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos
- .JournalIdProto;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos
- .GetEditLogManifestRequestProto;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos
- .GetEditLogManifestResponseProto;
-import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolPB;
+import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +49,7 @@ import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
+import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@@ -69,7 +67,6 @@ public class JournalNodeSyncer {
private final Journal journal;
private final String jid;
private String nameServiceId;
- private final JournalIdProto jidProto;
private final JNStorage jnStorage;
private final Configuration conf;
private volatile Daemon syncJournalDaemon;
@@ -90,7 +87,6 @@ public class JournalNodeSyncer {
this.journal = journal;
this.jid = jid;
this.nameServiceId = nameServiceId;
- this.jidProto = convertJournalId(this.jid);
this.jnStorage = journal.getStorage();
this.conf = conf;
journalSyncInterval = conf.getLong(
@@ -235,7 +231,7 @@ public class JournalNodeSyncer {
LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":"
+ jn.getBoundIpcAddress().getPort() + " with "
+ otherJNProxies.get(index) + ", journal id: " + jid);
- final QJournalProtocolPB jnProxy = otherJNProxies.get(index).jnProxy;
+ final InterQJournalProtocol jnProxy = otherJNProxies.get(index).jnProxy;
if (jnProxy == null) {
LOG.error("JournalNode Proxy not found.");
return;
@@ -249,13 +245,11 @@ public class JournalNodeSyncer {
return;
}
- GetEditLogManifestResponseProto editLogManifest;
+ GetEditLogManifestFromJournalResponseProto editLogManifest;
try {
- editLogManifest = jnProxy.getEditLogManifest(null,
- GetEditLogManifestRequestProto.newBuilder().setJid(jidProto)
- .setSinceTxId(0)
- .setInProgressOk(false).build());
- } catch (ServiceException e) {
+ editLogManifest = jnProxy.getEditLogManifestFromJournal(jid,
+ nameServiceId, 0, false);
+ } catch (IOException e) {
LOG.error("Could not sync with Journal at " +
otherJNProxies.get(journalNodeIndexForSync), e);
return;
@@ -323,14 +317,8 @@ public class JournalNodeSyncer {
Sets.newHashSet(jn.getBoundIpcAddress()));
}
- private JournalIdProto convertJournalId(String journalId) {
- return QJournalProtocolProtos.JournalIdProto.newBuilder()
- .setIdentifier(journalId)
- .build();
- }
-
private void getMissingLogSegments(List thisJournalEditLogs,
- GetEditLogManifestResponseProto response,
+ GetEditLogManifestFromJournalResponseProto response,
JournalNodeProxy remoteJNproxy) {
List otherJournalEditLogs = PBHelper.convert(
@@ -497,13 +485,26 @@ public class JournalNodeSyncer {
private class JournalNodeProxy {
private final InetSocketAddress jnAddr;
- private final QJournalProtocolPB jnProxy;
+ private final InterQJournalProtocol jnProxy;
private URL httpServerUrl;
JournalNodeProxy(InetSocketAddress jnAddr) throws IOException {
+ final Configuration confCopy = new Configuration(conf);
this.jnAddr = jnAddr;
- this.jnProxy = RPC.getProxy(QJournalProtocolPB.class,
- RPC.getProtocolVersion(QJournalProtocolPB.class), jnAddr, conf);
+ this.jnProxy = SecurityUtil.doAsLoginUser(
+ new PrivilegedExceptionAction() {
+ @Override
+ public InterQJournalProtocol run() throws IOException {
+ RPC.setProtocolEngine(confCopy, InterQJournalProtocolPB.class,
+ ProtobufRpcEngine.class);
+ InterQJournalProtocolPB interQJournalProtocolPB = RPC.getProxy(
+ InterQJournalProtocolPB.class,
+ RPC.getProtocolVersion(InterQJournalProtocolPB.class),
+ jnAddr, confCopy);
+ return new InterQJournalProtocolTranslatorPB(
+ interQJournalProtocolPB);
+ }
+ });
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto
new file mode 100644
index 00000000000..8fe9e6941a5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and stable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *stable* .proto interface.
+ */
+
+option java_package = "org.apache.hadoop.hdfs.qjournal.protocol";
+option java_outer_classname = "InterQJournalProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.hdfs.qjournal;
+
+import "HdfsServer.proto";
+import "QJournalProtocol.proto";
+
+message GetEditLogManifestFromJournalRequestProto {
+ required JournalIdProto jid = 1;
+ required uint64 sinceTxId = 2; // Transaction ID
+ optional bool inProgressOk = 3 [default = false];
+ optional string nameServiceId = 4;
+}
+
+message GetEditLogManifestFromJournalResponseProto {
+ required RemoteEditLogManifestProto manifest = 1;
+ required uint32 httpPort = 2;
+ optional string fromURL = 3;
+}
+
+service InterQJournalProtocolService {
+ rpc getEditLogManifestFromJournal(GetEditLogManifestFromJournalRequestProto)
+ returns (GetEditLogManifestFromJournalResponseProto);
+}
\ No newline at end of file