HDFS-3893. QJM: Make QJM work with security enabled. Contributed by Aaron T. Myers.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1381770 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8b88542f60
commit
df801074c9
|
@ -25,6 +25,7 @@ import java.net.URLConnection;
|
|||
import java.net.UnknownHostException;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.ServiceLoader;
|
||||
|
@ -452,6 +453,41 @@ public class SecurityUtil {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform the given action as the daemon's login user. If an
|
||||
* InterruptedException is thrown, it is converted to an IOException.
|
||||
*
|
||||
* @param action the action to perform
|
||||
* @return the result of the action
|
||||
* @throws IOException in the event of error
|
||||
*/
|
||||
public static <T> T doAsLoginUser(PrivilegedExceptionAction<T> action)
|
||||
throws IOException {
|
||||
return doAsUser(UserGroupInformation.getLoginUser(), action);
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform the given action as the daemon's current user. If an
|
||||
* InterruptedException is thrown, it is converted to an IOException.
|
||||
*
|
||||
* @param action the action to perform
|
||||
* @return the result of the action
|
||||
* @throws IOException in the event of error
|
||||
*/
|
||||
public static <T> T doAsCurrentUser(PrivilegedExceptionAction<T> action)
|
||||
throws IOException {
|
||||
return doAsUser(UserGroupInformation.getCurrentUser(), action);
|
||||
}
|
||||
|
||||
private static <T> T doAsUser(UserGroupInformation ugi,
|
||||
PrivilegedExceptionAction<T> action) throws IOException {
|
||||
try {
|
||||
return ugi.doAs(action);
|
||||
} catch (InterruptedException ie) {
|
||||
throw new IOException(ie);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Open a (if need be) secure connection to a URL in a secure environment
|
||||
* that is using SPNEGO to authenticate its URLs. All Namenode and Secondary
|
||||
|
|
|
@ -239,5 +239,12 @@
|
|||
group list is separated by a blank. For e.g. "alice,bob users,wheel".
|
||||
A special value of "*" means all users are allowed.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>security.qjournal.service.protocol.acl</name>
|
||||
<value>${HADOOP_HDFS_USER}</value>
|
||||
<description>ACL for QJournalProtocol, used by the NN to communicate with
|
||||
JNs when using the QuorumJournalManager for edit logs.</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -48,3 +48,5 @@ HDFS-3870. Add metrics to JournalNode (todd)
|
|||
HDFS-3891. Make selectInputStreams throw IOE instead of RTE (todd)
|
||||
|
||||
HDFS-3726. If a logger misses an RPC, don't retry that logger until next segment (todd)
|
||||
|
||||
HDFS-3893. QJM: Make QJM work with security enabled. (atm)
|
||||
|
|
|
@ -75,6 +75,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
|||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.net.NodeBase;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
@ -450,6 +451,34 @@ public class DFSUtil {
|
|||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a collection of all configured NN Kerberos principals.
|
||||
*/
|
||||
public static Set<String> getAllNnPrincipals(Configuration conf) throws IOException {
|
||||
Set<String> principals = new HashSet<String>();
|
||||
for (String nsId : DFSUtil.getNameServiceIds(conf)) {
|
||||
if (HAUtil.isHAEnabled(conf, nsId)) {
|
||||
for (String nnId : DFSUtil.getNameNodeIds(conf, nsId)) {
|
||||
Configuration confForNn = new Configuration(conf);
|
||||
NameNode.initializeGenericKeys(confForNn, nsId, nnId);
|
||||
String principal = SecurityUtil.getServerPrincipal(confForNn
|
||||
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
|
||||
NameNode.getAddress(confForNn).getHostName());
|
||||
principals.add(principal);
|
||||
}
|
||||
} else {
|
||||
Configuration confForNn = new Configuration(conf);
|
||||
NameNode.initializeGenericKeys(confForNn, nsId, null);
|
||||
String principal = SecurityUtil.getServerPrincipal(confForNn
|
||||
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
|
||||
NameNode.getAddress(confForNn).getHostName());
|
||||
principals.add(principal);
|
||||
}
|
||||
}
|
||||
|
||||
return principals;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
|
||||
* the configuration.
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.ha.HAServiceProtocol;
|
|||
import org.apache.hadoop.ha.ZKFCProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||
|
@ -46,6 +47,7 @@ public class HDFSPolicyProvider extends PolicyProvider {
|
|||
new Service("security.inter.datanode.protocol.acl",
|
||||
InterDatanodeProtocol.class),
|
||||
new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
|
||||
new Service("security.qjournal.service.protocol.acl", QJournalProtocol.class),
|
||||
new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL,
|
||||
HAServiceProtocol.class),
|
||||
new Service(CommonConfigurationKeys.SECURITY_ZKFC_PROTOCOL_ACL,
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -164,13 +166,19 @@ public class IPCLoggerChannel implements AsyncLogger {
|
|||
}
|
||||
|
||||
protected QJournalProtocol createProxy() throws IOException {
|
||||
RPC.setProtocolEngine(conf,
|
||||
QJournalProtocolPB.class, ProtobufRpcEngine.class);
|
||||
QJournalProtocolPB pbproxy = RPC.getProxy(
|
||||
QJournalProtocolPB.class,
|
||||
RPC.getProtocolVersion(QJournalProtocolPB.class),
|
||||
addr, conf);
|
||||
return new QJournalProtocolTranslatorPB(pbproxy);
|
||||
return SecurityUtil.doAsLoginUser(
|
||||
new PrivilegedExceptionAction<QJournalProtocol>() {
|
||||
@Override
|
||||
public QJournalProtocol run() throws IOException {
|
||||
RPC.setProtocolEngine(conf,
|
||||
QJournalProtocolPB.class, ProtobufRpcEngine.class);
|
||||
QJournalProtocolPB pbproxy = RPC.getProxy(
|
||||
QJournalProtocolPB.class,
|
||||
RPC.getProtocolVersion(QJournalProtocolPB.class),
|
||||
addr, conf);
|
||||
return new QJournalProtocolTranslatorPB(pbproxy);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -22,6 +22,8 @@ import java.io.FileInputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLEncoder;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.servlet.ServletContext;
|
||||
import javax.servlet.ServletException;
|
||||
|
@ -34,15 +36,19 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
|
||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
|
||||
import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.ServletUtil;
|
||||
|
@ -67,34 +73,53 @@ public class GetJournalEditServlet extends HttpServlet {
|
|||
static final String JOURNAL_ID_PARAM = "jid";
|
||||
static final String SEGMENT_TXID_PARAM = "segmentTxId";
|
||||
|
||||
// TODO: create security tests
|
||||
protected boolean isValidRequestor(String remoteUser, Configuration conf)
|
||||
protected boolean isValidRequestor(HttpServletRequest request, Configuration conf)
|
||||
throws IOException {
|
||||
if (remoteUser == null) { // This really shouldn't happen...
|
||||
String remotePrincipal = request.getUserPrincipal().getName();
|
||||
String remoteShortName = request.getRemoteUser();
|
||||
if (remotePrincipal == null) { // This really shouldn't happen...
|
||||
LOG.warn("Received null remoteUser while authorizing access to " +
|
||||
"GetJournalEditServlet");
|
||||
return false;
|
||||
}
|
||||
|
||||
String[] validRequestors = {
|
||||
SecurityUtil.getServerPrincipal(conf
|
||||
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), NameNode
|
||||
.getAddress(conf).getHostName()),
|
||||
SecurityUtil.getServerPrincipal(conf
|
||||
.get(DFSConfigKeys.DFS_JOURNALNODE_USER_NAME_KEY),
|
||||
NameNode.getAddress(conf).getHostName()) };
|
||||
// TODO: above principal is not correct, since each JN will have a
|
||||
// different hostname.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Validating request made by " + remotePrincipal +
|
||||
" / " + remoteShortName + ". This user is: " +
|
||||
UserGroupInformation.getLoginUser());
|
||||
}
|
||||
|
||||
Set<String> validRequestors = new HashSet<String>();
|
||||
validRequestors.addAll(DFSUtil.getAllNnPrincipals(conf));
|
||||
validRequestors.add(
|
||||
SecurityUtil.getServerPrincipal(conf
|
||||
.get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY),
|
||||
SecondaryNameNode.getHttpAddress(conf).getHostName()));
|
||||
|
||||
// Check the full principal name of all the configured valid requestors.
|
||||
for (String v : validRequestors) {
|
||||
if (v != null && v.equals(remoteUser)) {
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("isValidRequestor is comparing to valid requestor: " + v);
|
||||
if (v != null && v.equals(remotePrincipal)) {
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("isValidRequestor is allowing: " + remoteUser);
|
||||
LOG.debug("isValidRequestor is allowing: " + remotePrincipal);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Additionally, we compare the short name of the requestor to this JN's
|
||||
// username, because we want to allow requests from other JNs during
|
||||
// recovery, but we can't enumerate the full list of JNs.
|
||||
if (remoteShortName.equals(
|
||||
UserGroupInformation.getLoginUser().getShortUserName())) {
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("isValidRequestor is allowing other JN principal: " +
|
||||
remotePrincipal);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("isValidRequestor is rejecting: " + remoteUser);
|
||||
LOG.debug("isValidRequestor is rejecting: " + remotePrincipal);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -102,7 +127,7 @@ public class GetJournalEditServlet extends HttpServlet {
|
|||
HttpServletRequest request, HttpServletResponse response)
|
||||
throws IOException {
|
||||
if (UserGroupInformation.isSecurityEnabled()
|
||||
&& !isValidRequestor(request.getRemoteUser(), conf)) {
|
||||
&& !isValidRequestor(request, conf)) {
|
||||
response.sendError(HttpServletResponse.SC_FORBIDDEN,
|
||||
"Only Namenode and another JournalNode may access this servlet");
|
||||
LOG.warn("Received non-NN/JN request for edits from "
|
||||
|
@ -206,4 +231,4 @@ public class GetJournalEditServlet extends HttpServlet {
|
|||
}
|
||||
return path.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.net.URL;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -51,6 +52,7 @@ import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
|
|||
import org.apache.hadoop.hdfs.util.BestEffortLongFile;
|
||||
import org.apache.hadoop.hdfs.util.PersistentLongFile;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -708,31 +710,37 @@ class Journal implements Closeable {
|
|||
* @throws IOException
|
||||
*/
|
||||
private void syncLog(RequestInfo reqInfo,
|
||||
SegmentStateProto segment, URL url) throws IOException {
|
||||
final SegmentStateProto segment, final URL url) throws IOException {
|
||||
String tmpFileName =
|
||||
"synclog_" + segment.getStartTxId() + "_" +
|
||||
reqInfo.getEpoch() + "." + reqInfo.getIpcSerialNumber();
|
||||
|
||||
List<File> localPaths = storage.getFiles(null, tmpFileName);
|
||||
final List<File> localPaths = storage.getFiles(null, tmpFileName);
|
||||
assert localPaths.size() == 1;
|
||||
File tmpFile = localPaths.get(0);
|
||||
|
||||
boolean success = false;
|
||||
final File tmpFile = localPaths.get(0);
|
||||
|
||||
LOG.info("Synchronizing log " +
|
||||
TextFormat.shortDebugString(segment) + " from " + url);
|
||||
TransferFsImage.doGetUrl(url, localPaths, storage, true);
|
||||
assert tmpFile.exists();
|
||||
try {
|
||||
success = tmpFile.renameTo(storage.getInProgressEditLog(
|
||||
segment.getStartTxId()));
|
||||
} finally {
|
||||
if (!success) {
|
||||
if (!tmpFile.delete()) {
|
||||
LOG.warn("Failed to delete temporary file " + tmpFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
SecurityUtil.doAsLoginUser(
|
||||
new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws IOException {
|
||||
TransferFsImage.doGetUrl(url, localPaths, storage, true);
|
||||
assert tmpFile.exists();
|
||||
boolean success = false;
|
||||
try {
|
||||
success = tmpFile.renameTo(storage.getInProgressEditLog(
|
||||
segment.getStartTxId()));
|
||||
} finally {
|
||||
if (!success) {
|
||||
if (!tmpFile.delete()) {
|
||||
LOG.warn("Failed to delete temporary file " + tmpFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
|
|||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
|
@ -133,6 +134,10 @@ public class JournalNode implements Tool, Configurable {
|
|||
JvmMetrics.create("JournalNode",
|
||||
conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY),
|
||||
DefaultMetricsSystem.instance());
|
||||
|
||||
InetSocketAddress socAddr = JournalNodeRpcServer.getAddress(conf);
|
||||
SecurityUtil.login(conf, DFSConfigKeys.DFS_JOURNALNODE_KEYTAB_FILE_KEY,
|
||||
DFSConfigKeys.DFS_JOURNALNODE_USER_NAME_KEY, socAddr.getHostName());
|
||||
|
||||
httpServer = new JournalNodeHttpServer(conf, this);
|
||||
httpServer.start();
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.qjournal.server;
|
|||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_KEYTAB_FILE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_USER_NAME_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
@ -34,8 +34,9 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||
import org.apache.hadoop.http.HttpServer;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
|
||||
/**
|
||||
* Encapsulates the HTTP server started by the Journal Service.
|
||||
|
@ -62,8 +63,9 @@ public class JournalNodeHttpServer {
|
|||
final InetSocketAddress bindAddr = getAddress(conf);
|
||||
|
||||
// initialize the webserver for uploading/downloading files.
|
||||
LOG.info("Starting web server as: "
|
||||
+ UserGroupInformation.getCurrentUser().getUserName());
|
||||
LOG.info("Starting web server as: "+ SecurityUtil.getServerPrincipal(conf
|
||||
.get(DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY),
|
||||
bindAddr.getHostName()));
|
||||
|
||||
int tmpInfoPort = bindAddr.getPort();
|
||||
httpServer = new HttpServer("journal", bindAddr.getHostName(),
|
||||
|
@ -71,7 +73,7 @@ public class JournalNodeHttpServer {
|
|||
.get(DFS_ADMIN, " "))) {
|
||||
{
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
initSpnego(conf, DFS_JOURNALNODE_USER_NAME_KEY,
|
||||
initSpnego(conf, DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY,
|
||||
DFS_JOURNALNODE_KEYTAB_FILE_KEY);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,9 @@ import java.net.InetSocketAddress;
|
|||
import java.net.URL;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
|
||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
|
||||
|
@ -65,6 +67,12 @@ class JournalNodeRpcServer implements QJournalProtocol {
|
|||
service, addr.getHostName(),
|
||||
addr.getPort(), HANDLER_COUNT, false, conf,
|
||||
null /*secretManager*/);
|
||||
|
||||
// set service-level authorization security policy
|
||||
if (conf.getBoolean(
|
||||
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
|
||||
server.refreshServiceAcl(conf, new HDFSPolicyProvider());
|
||||
}
|
||||
}
|
||||
|
||||
void start() {
|
||||
|
@ -83,7 +91,7 @@ class JournalNodeRpcServer implements QJournalProtocol {
|
|||
this.server.stop();
|
||||
}
|
||||
|
||||
private static InetSocketAddress getAddress(Configuration conf) {
|
||||
static InetSocketAddress getAddress(Configuration conf) {
|
||||
String addr = conf.get(
|
||||
DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
|
||||
DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_DEFAULT);
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -367,30 +368,36 @@ public class EditLogFileInputStream extends EditLogInputStream {
|
|||
|
||||
@Override
|
||||
public InputStream getInputStream() throws IOException {
|
||||
HttpURLConnection connection = (HttpURLConnection)
|
||||
SecurityUtil.openSecureHttpConnection(url);
|
||||
|
||||
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
|
||||
throw new HttpGetFailedException(
|
||||
"Fetch of " + url +
|
||||
" failed with status code " + connection.getResponseCode() +
|
||||
"\nResponse message:\n" + connection.getResponseMessage(),
|
||||
connection);
|
||||
}
|
||||
|
||||
String contentLength = connection.getHeaderField(CONTENT_LENGTH);
|
||||
if (contentLength != null) {
|
||||
advertisedSize = Long.parseLong(contentLength);
|
||||
if (advertisedSize <= 0) {
|
||||
throw new IOException("Invalid " + CONTENT_LENGTH + " header: " +
|
||||
contentLength);
|
||||
}
|
||||
} else {
|
||||
throw new IOException(CONTENT_LENGTH + " header is not provided " +
|
||||
"by the server when trying to fetch " + url);
|
||||
}
|
||||
|
||||
return connection.getInputStream();
|
||||
return SecurityUtil.doAsCurrentUser(
|
||||
new PrivilegedExceptionAction<InputStream>() {
|
||||
@Override
|
||||
public InputStream run() throws IOException {
|
||||
HttpURLConnection connection = (HttpURLConnection)
|
||||
SecurityUtil.openSecureHttpConnection(url);
|
||||
|
||||
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
|
||||
throw new HttpGetFailedException(
|
||||
"Fetch of " + url +
|
||||
" failed with status code " + connection.getResponseCode() +
|
||||
"\nResponse message:\n" + connection.getResponseMessage(),
|
||||
connection);
|
||||
}
|
||||
|
||||
String contentLength = connection.getHeaderField(CONTENT_LENGTH);
|
||||
if (contentLength != null) {
|
||||
advertisedSize = Long.parseLong(contentLength);
|
||||
if (advertisedSize <= 0) {
|
||||
throw new IOException("Invalid " + CONTENT_LENGTH + " header: " +
|
||||
contentLength);
|
||||
}
|
||||
} else {
|
||||
throw new IOException(CONTENT_LENGTH + " header is not provided " +
|
||||
"by the server when trying to fetch " + url);
|
||||
}
|
||||
|
||||
return connection.getInputStream();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -707,6 +707,12 @@ public class NameNode {
|
|||
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
|
||||
initializeGenericKeys(conf, nsId, namenodeId);
|
||||
checkAllowFormat(conf);
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
InetSocketAddress socAddr = getAddress(conf);
|
||||
SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
|
||||
DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
|
||||
}
|
||||
|
||||
Collection<URI> nameDirsToFormat = FSNamesystem.getNamespaceDirs(conf);
|
||||
List<URI> sharedDirs = FSNamesystem.getSharedEditsDirs(conf);
|
||||
|
@ -748,13 +754,13 @@ public class NameNode {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static boolean initializeSharedEdits(Configuration conf) {
|
||||
public static boolean initializeSharedEdits(Configuration conf) throws IOException {
|
||||
return initializeSharedEdits(conf, true);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static boolean initializeSharedEdits(Configuration conf,
|
||||
boolean force) {
|
||||
boolean force) throws IOException {
|
||||
return initializeSharedEdits(conf, force, false);
|
||||
}
|
||||
|
||||
|
@ -768,7 +774,7 @@ public class NameNode {
|
|||
* @return true if the command aborts, false otherwise
|
||||
*/
|
||||
private static boolean initializeSharedEdits(Configuration conf,
|
||||
boolean force, boolean interactive) {
|
||||
boolean force, boolean interactive) throws IOException {
|
||||
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
|
||||
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
|
||||
initializeGenericKeys(conf, nsId, namenodeId);
|
||||
|
@ -779,6 +785,12 @@ public class NameNode {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
InetSocketAddress socAddr = getAddress(conf);
|
||||
SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
|
||||
DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
|
||||
}
|
||||
|
||||
NNStorage existingStorage = null;
|
||||
try {
|
||||
Configuration confWithoutShared = new Configuration(conf);
|
||||
|
|
|
@ -166,13 +166,13 @@ public class TestInitializeSharedEdits {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testDontOverWriteExistingDir() {
|
||||
public void testDontOverWriteExistingDir() throws IOException {
|
||||
assertFalse(NameNode.initializeSharedEdits(conf, false));
|
||||
assertTrue(NameNode.initializeSharedEdits(conf, false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInitializeSharedEditsConfiguresGenericConfKeys() {
|
||||
public void testInitializeSharedEditsConfiguresGenericConfKeys() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns1");
|
||||
conf.set(DFSUtil.addKeySuffixes(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX,
|
||||
|
|
Loading…
Reference in New Issue