HDFS-13040. Kerberized inotify client fails despite kinit properly. Contributed by Istvan Fajth, Wei-Chiu Chuang, Xiao Chen.
This commit is contained in:
parent
c245050538
commit
0882725c88
|
@ -33,6 +33,7 @@ import static org.apache.hadoop.util.Time.now;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
@ -175,6 +176,7 @@ import org.apache.hadoop.ipc.RefreshResponse;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.Groups;
|
import org.apache.hadoop.security.Groups;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||||
|
@ -2113,15 +2115,15 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public EventBatchList getEditsFromTxid(long txid) throws IOException {
|
public EventBatchList getEditsFromTxid(final long txid) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
namesystem.checkOperation(OperationCategory.READ); // only active
|
namesystem.checkOperation(OperationCategory.READ); // only active
|
||||||
namesystem.checkSuperuserPrivilege();
|
namesystem.checkSuperuserPrivilege();
|
||||||
int maxEventsPerRPC = nn.getConf().getInt(
|
final int maxEventsPerRPC = nn.getConf().getInt(
|
||||||
DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY,
|
DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT);
|
DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT);
|
||||||
FSEditLog log = namesystem.getFSImage().getEditLog();
|
final FSEditLog log = namesystem.getFSImage().getEditLog();
|
||||||
long syncTxid = log.getSyncTxId();
|
final long syncTxid = log.getSyncTxId();
|
||||||
// If we haven't synced anything yet, we can only read finalized
|
// If we haven't synced anything yet, we can only read finalized
|
||||||
// segments since we can't reliably determine which txns in in-progress
|
// segments since we can't reliably determine which txns in in-progress
|
||||||
// segments have actually been committed (e.g. written to a quorum of JNs).
|
// segments have actually been committed (e.g. written to a quorum of JNs).
|
||||||
|
@ -2130,8 +2132,26 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
// journals. (In-progress segments written by old writers are already
|
// journals. (In-progress segments written by old writers are already
|
||||||
// discarded for us, so if we read any in-progress segments they are
|
// discarded for us, so if we read any in-progress segments they are
|
||||||
// guaranteed to have been written by this NameNode.)
|
// guaranteed to have been written by this NameNode.)
|
||||||
boolean readInProgress = syncTxid > 0;
|
final boolean readInProgress = syncTxid > 0;
|
||||||
|
|
||||||
|
// doas the NN login user for the actual operations to get edits.
|
||||||
|
// Notably this is necessary when polling from the remote edits via https.
|
||||||
|
// We have validated the client is a superuser from the NN RPC, so this
|
||||||
|
// running as the login user here is safe.
|
||||||
|
EventBatchList ret = SecurityUtil.doAsLoginUser(
|
||||||
|
new PrivilegedExceptionAction<EventBatchList>() {
|
||||||
|
@Override
|
||||||
|
public EventBatchList run() throws IOException {
|
||||||
|
return getEventBatchList(syncTxid, txid, log, readInProgress,
|
||||||
|
maxEventsPerRPC);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
private EventBatchList getEventBatchList(long syncTxid, long txid,
|
||||||
|
FSEditLog log, boolean readInProgress, int maxEventsPerRPC)
|
||||||
|
throws IOException {
|
||||||
List<EventBatch> batches = Lists.newArrayList();
|
List<EventBatch> batches = Lists.newArrayList();
|
||||||
int totalEvents = 0;
|
int totalEvents = 0;
|
||||||
long maxSeenTxid = -1;
|
long maxSeenTxid = -1;
|
||||||
|
@ -2150,7 +2170,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
// and are using QJM -- the edit log will be closed and this exception
|
// and are using QJM -- the edit log will be closed and this exception
|
||||||
// will result
|
// will result
|
||||||
LOG.info("NN is transitioning from active to standby and FSEditLog " +
|
LOG.info("NN is transitioning from active to standby and FSEditLog " +
|
||||||
"is closed -- could not read edits");
|
"is closed -- could not read edits");
|
||||||
return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
|
return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -52,7 +52,8 @@ public class MiniQJMHACluster {
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private StartupOption startOpt = null;
|
private StartupOption startOpt = null;
|
||||||
private final MiniDFSCluster.Builder dfsBuilder;
|
private final MiniDFSCluster.Builder dfsBuilder;
|
||||||
|
private boolean forceRemoteEditsOnly = false;
|
||||||
|
|
||||||
public Builder(Configuration conf) {
|
public Builder(Configuration conf) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
// most QJMHACluster tests don't need DataNodes, so we'll make
|
// most QJMHACluster tests don't need DataNodes, so we'll make
|
||||||
|
@ -71,6 +72,11 @@ public class MiniQJMHACluster {
|
||||||
public void startupOption(StartupOption startOpt) {
|
public void startupOption(StartupOption startOpt) {
|
||||||
this.startOpt = startOpt;
|
this.startOpt = startOpt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder setForceRemoteEditsOnly(boolean val) {
|
||||||
|
this.forceRemoteEditsOnly = val;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static MiniDFSNNTopology createDefaultTopology(int basePort) {
|
public static MiniDFSNNTopology createDefaultTopology(int basePort) {
|
||||||
|
@ -100,7 +106,7 @@ public class MiniQJMHACluster {
|
||||||
// start cluster with 2 NameNodes
|
// start cluster with 2 NameNodes
|
||||||
MiniDFSNNTopology topology = createDefaultTopology(basePort);
|
MiniDFSNNTopology topology = createDefaultTopology(basePort);
|
||||||
|
|
||||||
initHAConf(journalURI, builder.conf, basePort);
|
initHAConf(journalURI, builder, basePort);
|
||||||
|
|
||||||
// First start up the NNs just to format the namespace. The MinIDFSCluster
|
// First start up the NNs just to format the namespace. The MinIDFSCluster
|
||||||
// has no way to just format the NameNodes without also starting them.
|
// has no way to just format the NameNodes without also starting them.
|
||||||
|
@ -131,11 +137,16 @@ public class MiniQJMHACluster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Configuration initHAConf(URI journalURI, Configuration conf,
|
private Configuration initHAConf(URI journalURI, Builder builder,
|
||||||
int basePort) {
|
int basePort) {
|
||||||
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
|
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
|
||||||
journalURI.toString());
|
journalURI.toString());
|
||||||
|
if (builder.forceRemoteEditsOnly) {
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, journalURI.toString());
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY,
|
||||||
|
journalURI.toString());
|
||||||
|
}
|
||||||
|
|
||||||
String address1 = "127.0.0.1:" + basePort;
|
String address1 = "127.0.0.1:" + basePort;
|
||||||
String address2 = "127.0.0.1:" + (basePort + 2);
|
String address2 = "127.0.0.1:" + (basePort + 2);
|
||||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||||
|
|
Loading…
Reference in New Issue