HDFS-13023. Journal Sync does not work on a secure cluster. Contributed by Bharat Viswanadham.

This commit is contained in:
Hanisha Koneru 2018-01-22 16:02:32 -08:00
parent 3fde0f1db5
commit 6347b2253d
12 changed files with 386 additions and 29 deletions

View File

@ -123,6 +123,14 @@
JNs when using the QuorumJournalManager for edit logs.</description> JNs when using the QuorumJournalManager for edit logs.</description>
</property> </property>
<property>
<name>security.interqjournal.service.protocol.acl</name>
<value>*</value>
<description>ACL for InterQJournalProtocol, used by the JN to
communicate with other JN
</description>
</property>
<property> <property>
<name>security.mrhs.client.protocol.acl</name> <name>security.mrhs.client.protocol.acl</name>
<value>*</value> <value>*</value>

View File

@ -224,6 +224,8 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
SECURITY_NAMENODE_PROTOCOL_ACL = "security.namenode.protocol.acl"; SECURITY_NAMENODE_PROTOCOL_ACL = "security.namenode.protocol.acl";
public static final String SECURITY_QJOURNAL_SERVICE_PROTOCOL_ACL = public static final String SECURITY_QJOURNAL_SERVICE_PROTOCOL_ACL =
"security.qjournal.service.protocol.acl"; "security.qjournal.service.protocol.acl";
public static final String SECURITY_INTERQJOURNAL_SERVICE_PROTOCOL_ACL =
"security.interqjournal.service.protocol.acl";
public static final String HADOOP_SECURITY_TOKEN_SERVICE_USE_IP = public static final String HADOOP_SECURITY_TOKEN_SERVICE_USE_IP =
"hadoop.security.token.service.use_ip"; "hadoop.security.token.service.use_ip";
public static final boolean HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT = public static final boolean HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT =

View File

@ -1,3 +1,4 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file

View File

@ -346,6 +346,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<include>FederationProtocol.proto</include> <include>FederationProtocol.proto</include>
<include>RouterProtocol.proto</include> <include>RouterProtocol.proto</include>
<include>AliasMapProtocol.proto</include> <include>AliasMapProtocol.proto</include>
<include>InterQJournalProtocol.proto</include>
</includes> </includes>
</source> </source>
</configuration> </configuration>

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.ha.ZKFCProtocol;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol; import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeLifelineProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeLifelineProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@ -57,6 +58,9 @@ public class HDFSPolicyProvider extends PolicyProvider {
NamenodeProtocol.class), NamenodeProtocol.class),
new Service(CommonConfigurationKeys.SECURITY_QJOURNAL_SERVICE_PROTOCOL_ACL, new Service(CommonConfigurationKeys.SECURITY_QJOURNAL_SERVICE_PROTOCOL_ACL,
QJournalProtocol.class), QJournalProtocol.class),
new Service(
CommonConfigurationKeys.SECURITY_INTERQJOURNAL_SERVICE_PROTOCOL_ACL,
InterQJournalProtocol.class),
new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL, new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL,
HAServiceProtocol.class), HAServiceProtocol.class),
new Service(CommonConfigurationKeys.SECURITY_ZKFC_PROTOCOL_ACL, new Service(CommonConfigurationKeys.SECURITY_ZKFC_PROTOCOL_ACL,

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalResponseProto;
import org.apache.hadoop.security.KerberosInfo;
import java.io.IOException;
/**
* Protocol used to communicate between {@link JournalNode} for journalsync.
*
* This is responsible for sending edit log manifest.
*/
@KerberosInfo(
serverPrincipal = DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY,
clientPrincipal = DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY)
@InterfaceAudience.Private
public interface InterQJournalProtocol {
long versionID = 1L;
/**
* @param jid the journal from which to enumerate edits
* @param sinceTxId the first transaction which the client cares about
* @param inProgressOk whether or not to check the in-progress edit log
* segment
* @return a list of edit log segments since the given transaction ID.
*/
GetEditLogManifestFromJournalResponseProto getEditLogManifestFromJournal(
String jid, String nameServiceId, long sinceTxId, boolean inProgressOk)
throws IOException;
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal.protocolPB;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.InterQJournalProtocolService;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.security.KerberosInfo;
/**
* Protocol used to communicate between journal nodes for journal sync.
* Note: This extends the protocolbuffer service based interface to
* add annotations required for security.
*/
@KerberosInfo(
serverPrincipal = DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY,
clientPrincipal = DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY)
@ProtocolInfo(protocolName =
"org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol",
protocolVersion = 1)
@InterfaceAudience.Private
public interface InterQJournalProtocolPB extends
InterQJournalProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal.protocolPB;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalResponseProto;
import java.io.IOException;
/**
* Implementation for protobuf service that forwards requests
* received on {@link InterQJournalProtocolPB} to the
* {@link InterQJournalProtocol} server implementation.
*/
@InterfaceAudience.Private
public class InterQJournalProtocolServerSideTranslatorPB implements
InterQJournalProtocolPB{
/* Server side implementation to delegate the requests to. */
private final InterQJournalProtocol impl;
public InterQJournalProtocolServerSideTranslatorPB(InterQJournalProtocol
impl) {
this.impl = impl;
}
@Override
public GetEditLogManifestFromJournalResponseProto
getEditLogManifestFromJournal(RpcController controller,
GetEditLogManifestFromJournalRequestProto
request) throws ServiceException {
try {
return impl.getEditLogManifestFromJournal(
request.getJid().getIdentifier(),
request.hasNameServiceId() ? request.getNameServiceId() : null,
request.getSinceTxId(),
request.getInProgressOk());
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal.protocolPB;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
import java.io.Closeable;
import java.io.IOException;
/**
* This class is the client side translator to translate the requests made on
* {@link InterQJournalProtocol} interfaces to the RPC server implementing
* {@link InterQJournalProtocolPB}.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class InterQJournalProtocolTranslatorPB implements ProtocolMetaInterface,
InterQJournalProtocol, Closeable {
/* RpcController is not used and hence is set to null. */
private final static RpcController NULL_CONTROLLER = null;
private final InterQJournalProtocolPB rpcProxy;
public InterQJournalProtocolTranslatorPB(InterQJournalProtocolPB rpcProxy) {
this.rpcProxy = rpcProxy;
}
@Override
public void close() {
RPC.stopProxy(rpcProxy);
}
@Override
public GetEditLogManifestFromJournalResponseProto
getEditLogManifestFromJournal(String jid, String nameServiceId,
long sinceTxId, boolean inProgressOk)
throws IOException {
try {
GetEditLogManifestFromJournalRequestProto.Builder req;
req = GetEditLogManifestFromJournalRequestProto.newBuilder()
.setJid(convertJournalId(jid))
.setSinceTxId(sinceTxId)
.setInProgressOk(inProgressOk);
if (nameServiceId !=null) {
req.setNameServiceId(nameServiceId);
}
return rpcProxy.getEditLogManifestFromJournal(NULL_CONTROLLER,
req.build()
);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
private QJournalProtocolProtos.JournalIdProto convertJournalId(String jid) {
return QJournalProtocolProtos.JournalIdProto.newBuilder()
.setIdentifier(jid)
.build();
}
@Override
public boolean isMethodSupported(String methodName) throws IOException {
return RpcClientUtil.isMethodSupported(rpcProxy,
InterQJournalProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(InterQJournalProtocolPB.class), methodName);
}
}

View File

@ -26,8 +26,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.InterQJournalProtocolService;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
@ -36,6 +40,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRe
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.QJournalProtocolService; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.QJournalProtocolService;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo; import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolPB;
import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB; import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.StorageInfo;
@ -52,7 +58,8 @@ import com.google.protobuf.BlockingService;
@InterfaceAudience.Private @InterfaceAudience.Private
@VisibleForTesting @VisibleForTesting
public class JournalNodeRpcServer implements QJournalProtocol { public class JournalNodeRpcServer implements QJournalProtocol,
InterQJournalProtocol {
private static final int HANDLER_COUNT = 5; private static final int HANDLER_COUNT = 5;
private final JournalNode jn; private final JournalNode jn;
private Server server; private Server server;
@ -84,6 +91,19 @@ public class JournalNodeRpcServer implements QJournalProtocol {
.setVerbose(false) .setVerbose(false)
.build(); .build();
//Adding InterQJournalProtocolPB to server
InterQJournalProtocolServerSideTranslatorPB
qJournalProtocolServerSideTranslatorPB = new
InterQJournalProtocolServerSideTranslatorPB(this);
BlockingService interQJournalProtocolService = InterQJournalProtocolService
.newReflectiveBlockingService(qJournalProtocolServerSideTranslatorPB);
DFSUtil.addPBProtocol(confCopy, InterQJournalProtocolPB.class,
interQJournalProtocolService, server);
// set service-level authorization security policy // set service-level authorization security policy
if (confCopy.getBoolean( if (confCopy.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) { CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
@ -263,4 +283,20 @@ public class JournalNodeRpcServer implements QJournalProtocol {
String nameServiceId) throws IOException { String nameServiceId) throws IOException {
return jn.getJournalCTime(journalId, nameServiceId); return jn.getJournalCTime(journalId, nameServiceId);
} }
@SuppressWarnings("deprecation")
@Override
public GetEditLogManifestFromJournalResponseProto
getEditLogManifestFromJournal(String jid, String nameServiceId,
long sinceTxId, boolean inProgressOk)
throws IOException {
RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid, nameServiceId)
.getEditLogManifest(sinceTxId, inProgressOk);
return GetEditLogManifestFromJournalResponseProto.newBuilder()
.setManifest(PBHelper.convert(manifest))
.setHttpPort(jn.getBoundHttpAddress().getPort())
.setFromURL(jn.getHttpServerURI())
.build();
}
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.qjournal.server;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
@ -28,19 +27,17 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos; import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetEditLogManifestFromJournalResponseProto;
.JournalIdProto; import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolPB;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos import org.apache.hadoop.hdfs.qjournal.protocolPB.InterQJournalProtocolTranslatorPB;
.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos
.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -52,6 +49,7 @@ import java.net.MalformedURLException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -69,7 +67,6 @@ public class JournalNodeSyncer {
private final Journal journal; private final Journal journal;
private final String jid; private final String jid;
private String nameServiceId; private String nameServiceId;
private final JournalIdProto jidProto;
private final JNStorage jnStorage; private final JNStorage jnStorage;
private final Configuration conf; private final Configuration conf;
private volatile Daemon syncJournalDaemon; private volatile Daemon syncJournalDaemon;
@ -90,7 +87,6 @@ public class JournalNodeSyncer {
this.journal = journal; this.journal = journal;
this.jid = jid; this.jid = jid;
this.nameServiceId = nameServiceId; this.nameServiceId = nameServiceId;
this.jidProto = convertJournalId(this.jid);
this.jnStorage = journal.getStorage(); this.jnStorage = journal.getStorage();
this.conf = conf; this.conf = conf;
journalSyncInterval = conf.getLong( journalSyncInterval = conf.getLong(
@ -235,7 +231,7 @@ public class JournalNodeSyncer {
LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":" LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":"
+ jn.getBoundIpcAddress().getPort() + " with " + jn.getBoundIpcAddress().getPort() + " with "
+ otherJNProxies.get(index) + ", journal id: " + jid); + otherJNProxies.get(index) + ", journal id: " + jid);
final QJournalProtocolPB jnProxy = otherJNProxies.get(index).jnProxy; final InterQJournalProtocol jnProxy = otherJNProxies.get(index).jnProxy;
if (jnProxy == null) { if (jnProxy == null) {
LOG.error("JournalNode Proxy not found."); LOG.error("JournalNode Proxy not found.");
return; return;
@ -249,13 +245,11 @@ public class JournalNodeSyncer {
return; return;
} }
GetEditLogManifestResponseProto editLogManifest; GetEditLogManifestFromJournalResponseProto editLogManifest;
try { try {
editLogManifest = jnProxy.getEditLogManifest(null, editLogManifest = jnProxy.getEditLogManifestFromJournal(jid,
GetEditLogManifestRequestProto.newBuilder().setJid(jidProto) nameServiceId, 0, false);
.setSinceTxId(0) } catch (IOException e) {
.setInProgressOk(false).build());
} catch (ServiceException e) {
LOG.error("Could not sync with Journal at " + LOG.error("Could not sync with Journal at " +
otherJNProxies.get(journalNodeIndexForSync), e); otherJNProxies.get(journalNodeIndexForSync), e);
return; return;
@ -323,14 +317,8 @@ public class JournalNodeSyncer {
Sets.newHashSet(jn.getBoundIpcAddress())); Sets.newHashSet(jn.getBoundIpcAddress()));
} }
private JournalIdProto convertJournalId(String journalId) {
return QJournalProtocolProtos.JournalIdProto.newBuilder()
.setIdentifier(journalId)
.build();
}
private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs, private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs,
GetEditLogManifestResponseProto response, GetEditLogManifestFromJournalResponseProto response,
JournalNodeProxy remoteJNproxy) { JournalNodeProxy remoteJNproxy) {
List<RemoteEditLog> otherJournalEditLogs = PBHelper.convert( List<RemoteEditLog> otherJournalEditLogs = PBHelper.convert(
@ -497,13 +485,26 @@ public class JournalNodeSyncer {
private class JournalNodeProxy { private class JournalNodeProxy {
private final InetSocketAddress jnAddr; private final InetSocketAddress jnAddr;
private final QJournalProtocolPB jnProxy; private final InterQJournalProtocol jnProxy;
private URL httpServerUrl; private URL httpServerUrl;
JournalNodeProxy(InetSocketAddress jnAddr) throws IOException { JournalNodeProxy(InetSocketAddress jnAddr) throws IOException {
final Configuration confCopy = new Configuration(conf);
this.jnAddr = jnAddr; this.jnAddr = jnAddr;
this.jnProxy = RPC.getProxy(QJournalProtocolPB.class, this.jnProxy = SecurityUtil.doAsLoginUser(
RPC.getProtocolVersion(QJournalProtocolPB.class), jnAddr, conf); new PrivilegedExceptionAction<InterQJournalProtocol>() {
@Override
public InterQJournalProtocol run() throws IOException {
RPC.setProtocolEngine(confCopy, InterQJournalProtocolPB.class,
ProtobufRpcEngine.class);
InterQJournalProtocolPB interQJournalProtocolPB = RPC.getProxy(
InterQJournalProtocolPB.class,
RPC.getProtocolVersion(InterQJournalProtocolPB.class),
jnAddr, confCopy);
return new InterQJournalProtocolTranslatorPB(
interQJournalProtocolPB);
}
});
} }
@Override @Override

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* These .proto interfaces are private and stable.
* Please see http://wiki.apache.org/hadoop/Compatibility
* for what changes are allowed for a *stable* .proto interface.
*/
option java_package = "org.apache.hadoop.hdfs.qjournal.protocol";
option java_outer_classname = "InterQJournalProtocolProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.hdfs.qjournal;
import "HdfsServer.proto";
import "QJournalProtocol.proto";
message GetEditLogManifestFromJournalRequestProto {
required JournalIdProto jid = 1;
required uint64 sinceTxId = 2; // Transaction ID
optional bool inProgressOk = 3 [default = false];
optional string nameServiceId = 4;
}
message GetEditLogManifestFromJournalResponseProto {
required RemoteEditLogManifestProto manifest = 1;
required uint32 httpPort = 2;
optional string fromURL = 3;
}
service InterQJournalProtocolService {
rpc getEditLogManifestFromJournal(GetEditLogManifestFromJournalRequestProto)
returns (GetEditLogManifestFromJournalResponseProto);
}