YARN-5098. Fixed ResourceManager's DelegationTokenRenewer to replace expiring system-tokens if RM stops and only restarts after a long time. Contributed by Jian He.
Made one minor edit for branch-2 patch. (cherry picked from commitf10ebc67f5
) (cherry picked from commit029888871f
)
This commit is contained in:
parent
3c2bd19fa5
commit
c87b9c1471
|
@ -53,6 +53,7 @@ import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
@ -459,6 +460,18 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
try {
|
try {
|
||||||
renewToken(dttr);
|
renewToken(dttr);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
|
if (ioe instanceof SecretManager.InvalidToken
|
||||||
|
&& dttr.maxDate < Time.now()
|
||||||
|
&& evt instanceof DelegationTokenRenewerAppRecoverEvent
|
||||||
|
&& token.getKind().equals(HDFS_DELEGATION_KIND)) {
|
||||||
|
LOG.info("Failed to renew hdfs token " + dttr
|
||||||
|
+ " on recovery as it expired, requesting new hdfs token for "
|
||||||
|
+ applicationId + ", user=" + evt.getUser(), ioe);
|
||||||
|
requestNewHdfsDelegationTokenAsProxyUser(
|
||||||
|
Arrays.asList(applicationId), evt.getUser(),
|
||||||
|
evt.shouldCancelAtEnd());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
throw new IOException("Failed to renew token: " + dttr.token, ioe);
|
throw new IOException("Failed to renew token: " + dttr.token, ioe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -485,7 +498,8 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!hasHdfsToken) {
|
if (!hasHdfsToken) {
|
||||||
requestNewHdfsDelegationToken(Arrays.asList(applicationId), evt.getUser(),
|
requestNewHdfsDelegationTokenAsProxyUser(Arrays.asList(applicationId),
|
||||||
|
evt.getUser(),
|
||||||
shouldCancelAtEnd);
|
shouldCancelAtEnd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -586,8 +600,7 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
LOG.info("Renewed delegation-token= [" + dttr + "], for "
|
LOG.info("Renewed delegation-token= [" + dttr + "]");
|
||||||
+ dttr.referringAppIds);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Request new hdfs token if the token is about to expire, and remove the old
|
// Request new hdfs token if the token is about to expire, and remove the old
|
||||||
|
@ -625,12 +638,12 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.info("Token= (" + dttr + ") is expiring, request new token.");
|
LOG.info("Token= (" + dttr + ") is expiring, request new token.");
|
||||||
requestNewHdfsDelegationToken(applicationIds, dttr.user,
|
requestNewHdfsDelegationTokenAsProxyUser(applicationIds, dttr.user,
|
||||||
dttr.shouldCancelAtEnd);
|
dttr.shouldCancelAtEnd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void requestNewHdfsDelegationToken(
|
private void requestNewHdfsDelegationTokenAsProxyUser(
|
||||||
Collection<ApplicationId> referringAppIds,
|
Collection<ApplicationId> referringAppIds,
|
||||||
String user, boolean shouldCancelAtEnd) throws IOException,
|
String user, boolean shouldCancelAtEnd) throws IOException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
|
@ -912,8 +925,8 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
// Setup tokens for renewal during recovery
|
// Setup tokens for renewal during recovery
|
||||||
DelegationTokenRenewer.this.handleAppSubmitEvent(event);
|
DelegationTokenRenewer.this.handleAppSubmitEvent(event);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.warn(
|
LOG.warn("Unable to add the application to the delegation token"
|
||||||
"Unable to add the application to the delegation token renewer.", t);
|
+ " renewer on recovery.", t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -43,6 +43,7 @@ import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -84,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
|
@ -968,6 +970,101 @@ public class TestDelegationTokenRenewer {
|
||||||
Assert.assertTrue(appCredentials.getAllTokens().contains(expectedToken));
|
Assert.assertTrue(appCredentials.getAllTokens().contains(expectedToken));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// 1. token is expired before app completes.
|
||||||
|
// 2. RM shutdown.
|
||||||
|
// 3. When RM recovers the app, token renewal will fail as token expired.
|
||||||
|
// RM should request a new token and sent it to NM for log-aggregation.
|
||||||
|
@Test
|
||||||
|
public void testRMRestartWithExpiredToken() throws Exception {
|
||||||
|
Configuration yarnConf = new YarnConfiguration();
|
||||||
|
yarnConf
|
||||||
|
.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true);
|
||||||
|
yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
|
"kerberos");
|
||||||
|
yarnConf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
|
yarnConf
|
||||||
|
.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
|
UserGroupInformation.setConfiguration(yarnConf);
|
||||||
|
|
||||||
|
// create Token1:
|
||||||
|
Text userText1 = new Text("user1");
|
||||||
|
DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1,
|
||||||
|
new Text("renewer1"), userText1);
|
||||||
|
final Token<DelegationTokenIdentifier> originalToken =
|
||||||
|
new Token<>(dtId1.getBytes(), "password1".getBytes(), dtId1.getKind(),
|
||||||
|
new Text("service1"));
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
credentials.addToken(userText1, originalToken);
|
||||||
|
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(yarnConf);
|
||||||
|
MockRM rm1 = new TestSecurityMockRM(yarnConf, memStore);
|
||||||
|
rm1.start();
|
||||||
|
RMApp app = rm1.submitApp(200, "name", "user",
|
||||||
|
new HashMap<ApplicationAccessType, String>(), false, "default", 1,
|
||||||
|
credentials);
|
||||||
|
|
||||||
|
// create token2
|
||||||
|
Text userText2 = new Text("user1");
|
||||||
|
DelegationTokenIdentifier dtId2 =
|
||||||
|
new DelegationTokenIdentifier(userText1, new Text("renewer2"),
|
||||||
|
userText2);
|
||||||
|
final Token<DelegationTokenIdentifier> updatedToken =
|
||||||
|
new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
|
||||||
|
"password2".getBytes(), dtId2.getKind(), new Text("service2"));
|
||||||
|
final AtomicBoolean firstRenewInvoked = new AtomicBoolean(false);
|
||||||
|
final AtomicBoolean secondRenewInvoked = new AtomicBoolean(false);
|
||||||
|
MockRM rm2 = new TestSecurityMockRM(yarnConf, memStore) {
|
||||||
|
@Override
|
||||||
|
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
||||||
|
return new DelegationTokenRenewer() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void renewToken(final DelegationTokenToRenew dttr)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
if (dttr.token.equals(updatedToken)) {
|
||||||
|
secondRenewInvoked.set(true);
|
||||||
|
super.renewToken(dttr);
|
||||||
|
} else if (dttr.token.equals(originalToken)){
|
||||||
|
firstRenewInvoked.set(true);
|
||||||
|
throw new InvalidToken("Failed to renew");
|
||||||
|
} else {
|
||||||
|
throw new IOException("Unexpected");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Token<?>[] obtainSystemTokensForUser(String user,
|
||||||
|
final Credentials credentials) throws IOException {
|
||||||
|
credentials.addToken(updatedToken.getService(), updatedToken);
|
||||||
|
return new Token<?>[] { updatedToken };
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// simulating restart the rm
|
||||||
|
rm2.start();
|
||||||
|
|
||||||
|
// check nm can retrieve the token
|
||||||
|
final MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
|
||||||
|
ByteBuffer tokenBuffer =
|
||||||
|
response.getSystemCredentialsForApps().get(app.getApplicationId());
|
||||||
|
Assert.assertNotNull(tokenBuffer);
|
||||||
|
Credentials appCredentials = new Credentials();
|
||||||
|
DataInputByteBuffer buf = new DataInputByteBuffer();
|
||||||
|
tokenBuffer.rewind();
|
||||||
|
buf.reset(tokenBuffer);
|
||||||
|
appCredentials.readTokenStorageStream(buf);
|
||||||
|
Assert.assertTrue(firstRenewInvoked.get() && secondRenewInvoked.get());
|
||||||
|
Assert.assertTrue(appCredentials.getAllTokens().contains(updatedToken));
|
||||||
|
}
|
||||||
|
|
||||||
// YARN will get the token for the app submitted without the delegation token.
|
// YARN will get the token for the app submitted without the delegation token.
|
||||||
@Test
|
@Test
|
||||||
public void testAppSubmissionWithoutDelegationToken() throws Exception {
|
public void testAppSubmissionWithoutDelegationToken() throws Exception {
|
||||||
|
@ -1161,4 +1258,5 @@ public class TestDelegationTokenRenewer {
|
||||||
Assert.assertTrue(dttr.isTimerCancelled());
|
Assert.assertTrue(dttr.isTimerCancelled());
|
||||||
Assert.assertTrue(Renewer.cancelled);
|
Assert.assertTrue(Renewer.cancelled);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue