HADOOP-18581 : Handle Server KDC re-login when Server and Client run … (#5248)
* HADOOP-18581 : Handle Server KDC re-login when Server and Client run in same JVM.
This commit is contained in:
parent
cd19da1309
commit
a65d24488a
|
@ -123,6 +123,7 @@ import org.apache.hadoop.util.ExitUtil;
|
||||||
import org.apache.hadoop.util.ProtoUtil;
|
import org.apache.hadoop.util.ProtoUtil;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import org.apache.hadoop.tracing.Span;
|
import org.apache.hadoop.tracing.Span;
|
||||||
import org.apache.hadoop.tracing.SpanContext;
|
import org.apache.hadoop.tracing.SpanContext;
|
||||||
import org.apache.hadoop.tracing.TraceScope;
|
import org.apache.hadoop.tracing.TraceScope;
|
||||||
|
@ -153,6 +154,13 @@ public abstract class Server {
|
||||||
private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
|
private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
|
||||||
private Tracer tracer;
|
private Tracer tracer;
|
||||||
private AlignmentContext alignmentContext;
|
private AlignmentContext alignmentContext;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allow server to do force Kerberos re-login once after failure irrespective
|
||||||
|
* of the last login time.
|
||||||
|
*/
|
||||||
|
private final AtomicBoolean canTryForceLogin = new AtomicBoolean(true);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Logical name of the server used in metrics and monitor.
|
* Logical name of the server used in metrics and monitor.
|
||||||
*/
|
*/
|
||||||
|
@ -2206,7 +2214,23 @@ public abstract class Server {
|
||||||
AUDITLOG.warn(AUTH_FAILED_FOR + this.toString() + ":"
|
AUDITLOG.warn(AUTH_FAILED_FOR + this.toString() + ":"
|
||||||
+ attemptingUser + " (" + e.getLocalizedMessage()
|
+ attemptingUser + " (" + e.getLocalizedMessage()
|
||||||
+ ") with true cause: (" + tce.getLocalizedMessage() + ")");
|
+ ") with true cause: (" + tce.getLocalizedMessage() + ")");
|
||||||
throw tce;
|
if (!UserGroupInformation.getLoginUser().isLoginSuccess()) {
|
||||||
|
doKerberosRelogin();
|
||||||
|
try {
|
||||||
|
// try processing message again
|
||||||
|
LOG.debug("Reprocessing sasl message for {}:{} after re-login",
|
||||||
|
this.toString(), attemptingUser);
|
||||||
|
saslResponse = processSaslMessage(saslMessage);
|
||||||
|
AUDITLOG.info("Retry {}{}:{} after failure", AUTH_SUCCESSFUL_FOR,
|
||||||
|
this.toString(), attemptingUser);
|
||||||
|
canTryForceLogin.set(true);
|
||||||
|
} catch (IOException exp) {
|
||||||
|
tce = (IOException) getTrueCause(e);
|
||||||
|
throw tce;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw tce;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (saslServer != null && saslServer.isComplete()) {
|
if (saslServer != null && saslServer.isComplete()) {
|
||||||
|
@ -3322,6 +3346,26 @@ public abstract class Server {
|
||||||
metricsUpdaterInterval, metricsUpdaterInterval, TimeUnit.MILLISECONDS);
|
metricsUpdaterInterval, metricsUpdaterInterval, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private synchronized void doKerberosRelogin() throws IOException {
|
||||||
|
if(UserGroupInformation.getLoginUser().isLoginSuccess()){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.warn("Initiating re-login from IPC Server");
|
||||||
|
if (canTryForceLogin.compareAndSet(true, false)) {
|
||||||
|
if (UserGroupInformation.isLoginKeytabBased()) {
|
||||||
|
UserGroupInformation.getLoginUser().forceReloginFromKeytab();
|
||||||
|
} else if (UserGroupInformation.isLoginTicketBased()) {
|
||||||
|
UserGroupInformation.getLoginUser().forceReloginFromTicketCache();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (UserGroupInformation.isLoginKeytabBased()) {
|
||||||
|
UserGroupInformation.getLoginUser().reloginFromKeytab();
|
||||||
|
} else if (UserGroupInformation.isLoginTicketBased()) {
|
||||||
|
UserGroupInformation.getLoginUser().reloginFromTicketCache();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public synchronized void addAuxiliaryListener(int auxiliaryPort)
|
public synchronized void addAuxiliaryListener(int auxiliaryPort)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (auxiliaryListenerMap == null) {
|
if (auxiliaryListenerMap == null) {
|
||||||
|
|
|
@ -529,6 +529,18 @@ public class UserGroupInformation {
|
||||||
user.setLogin(login);
|
user.setLogin(login);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** This method checks for a successful Kerberos login
|
||||||
|
* and returns true by default if it is not using Kerberos.
|
||||||
|
*
|
||||||
|
* @return true on successful login
|
||||||
|
*/
|
||||||
|
public boolean isLoginSuccess() {
|
||||||
|
LoginContext login = user.getLogin();
|
||||||
|
return (login instanceof HadoopLoginContext)
|
||||||
|
? ((HadoopLoginContext) login).isLoginSuccess()
|
||||||
|
: true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the last login time for logged in user
|
* Set the last login time for logged in user
|
||||||
* @param loginTime the number of milliseconds since the beginning of time
|
* @param loginTime the number of milliseconds since the beginning of time
|
||||||
|
@ -1276,6 +1288,23 @@ public class UserGroupInformation {
|
||||||
relogin(login, ignoreLastLoginTime);
|
relogin(login, ignoreLastLoginTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Force re-Login a user in from the ticket cache irrespective of the last
|
||||||
|
* login time. This method assumes that login had happened already. The
|
||||||
|
* Subject field of this UserGroupInformation object is updated to have the
|
||||||
|
* new credentials.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* raised on errors performing I/O.
|
||||||
|
* @throws KerberosAuthException
|
||||||
|
* on a failure
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public void forceReloginFromTicketCache() throws IOException {
|
||||||
|
reloginFromTicketCache(true);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Re-Login a user in from the ticket cache. This
|
* Re-Login a user in from the ticket cache. This
|
||||||
* method assumes that login had happened already.
|
* method assumes that login had happened already.
|
||||||
|
@ -1287,6 +1316,11 @@ public class UserGroupInformation {
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public void reloginFromTicketCache() throws IOException {
|
public void reloginFromTicketCache() throws IOException {
|
||||||
|
reloginFromTicketCache(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void reloginFromTicketCache(boolean ignoreLastLoginTime)
|
||||||
|
throws IOException {
|
||||||
if (!shouldRelogin() || !isFromTicket()) {
|
if (!shouldRelogin() || !isFromTicket()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1294,7 +1328,7 @@ public class UserGroupInformation {
|
||||||
if (login == null) {
|
if (login == null) {
|
||||||
throw new KerberosAuthException(MUST_FIRST_LOGIN);
|
throw new KerberosAuthException(MUST_FIRST_LOGIN);
|
||||||
}
|
}
|
||||||
relogin(login, false);
|
relogin(login, ignoreLastLoginTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void relogin(HadoopLoginContext login, boolean ignoreLastLoginTime)
|
private void relogin(HadoopLoginContext login, boolean ignoreLastLoginTime)
|
||||||
|
@ -2083,6 +2117,11 @@ public class UserGroupInformation {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Get the login status. */
|
||||||
|
public boolean isLoginSuccess() {
|
||||||
|
return isLoggedIn.get();
|
||||||
|
}
|
||||||
|
|
||||||
String getAppName() {
|
String getAppName() {
|
||||||
return appName;
|
return appName;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue