YARN-5594. Handle old RMDelegationToken format when recovering RM (rkanter)

(cherry picked from commit d8863fc16f)
This commit is contained in:
Robert Kanter 2017-12-04 13:14:55 -08:00
parent 274bbb193f
commit 54b5da83ec
9 changed files with 282 additions and 58 deletions

View File

@ -22,6 +22,7 @@ import java.io.DataOutput;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
@ -64,6 +65,11 @@ public abstract class YARNDelegationTokenIdentifier extends
setMasterKeyId(builder.getMasterKeyId()); setMasterKeyId(builder.getMasterKeyId());
} }
public synchronized void readFieldsInOldFormat(DataInput in)
throws IOException {
super.readFields(in);
}
private void setBuilderFields() { private void setBuilderFields() {
if (builder.getOwner() != null && if (builder.getOwner() != null &&
!builder.getOwner().equals(getOwner().toString())) { !builder.getOwner().equals(getOwner().toString())) {
@ -97,6 +103,11 @@ public abstract class YARNDelegationTokenIdentifier extends
builder.build().writeTo((DataOutputStream) out); builder.build().writeTo((DataOutputStream) out);
} }
@VisibleForTesting
public synchronized void writeInOldFormat(DataOutput out) throws IOException {
super.write(out);
}
public YARNDelegationTokenIdentifierProto getProto() { public YARNDelegationTokenIdentifierProto getProto() {
setBuilderFields(); setBuilderFields();
return builder.build(); return builder.build();

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.HadoopKerberosName; import org.apache.hadoop.security.HadoopKerberosName;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -217,6 +218,16 @@ public class TestYARNTokenIdentifier {
@Test @Test
public void testRMDelegationTokenIdentifier() throws IOException { public void testRMDelegationTokenIdentifier() throws IOException {
testRMDelegationTokenIdentifier(false);
}
@Test
public void testRMDelegationTokenIdentifierOldFormat() throws IOException {
testRMDelegationTokenIdentifier(true);
}
public void testRMDelegationTokenIdentifier(boolean oldFormat)
throws IOException {
Text owner = new Text("user1"); Text owner = new Text("user1");
Text renewer = new Text("user2"); Text renewer = new Text("user2");
@ -226,58 +237,62 @@ public class TestYARNTokenIdentifier {
int sequenceNumber = 3; int sequenceNumber = 3;
int masterKeyId = 4; int masterKeyId = 4;
RMDelegationTokenIdentifier token = RMDelegationTokenIdentifier originalToken =
new RMDelegationTokenIdentifier(owner, renewer, realUser); new RMDelegationTokenIdentifier(owner, renewer, realUser);
token.setIssueDate(issueDate); originalToken.setIssueDate(issueDate);
token.setMaxDate(maxDate); originalToken.setMaxDate(maxDate);
token.setSequenceNumber(sequenceNumber); originalToken.setSequenceNumber(sequenceNumber);
token.setMasterKeyId(masterKeyId); originalToken.setMasterKeyId(masterKeyId);
RMDelegationTokenIdentifier anotherToken = new RMDelegationTokenIdentifier(); RMDelegationTokenIdentifier anotherToken
= new RMDelegationTokenIdentifier();
byte[] tokenContent = token.getBytes(); if (oldFormat) {
DataInputBuffer inBuf = new DataInputBuffer();
DataOutputBuffer outBuf = new DataOutputBuffer();
originalToken.writeInOldFormat(outBuf);
inBuf.reset(outBuf.getData(), 0, outBuf.getLength());
anotherToken.readFieldsInOldFormat(inBuf);
inBuf.close();
} else {
byte[] tokenContent = originalToken.getBytes();
DataInputBuffer dib = new DataInputBuffer(); DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenContent, tokenContent.length); dib.reset(tokenContent, tokenContent.length);
anotherToken.readFields(dib); anotherToken.readFields(dib);
dib.close(); dib.close();
}
// verify the whole record equals with original record // verify the whole record equals with original record
Assert.assertEquals("Token is not the same after serialization " + Assert.assertEquals(
"and deserialization.", token, anotherToken); "Token is not the same after serialization and deserialization.",
originalToken, anotherToken);
Assert.assertEquals("owner from proto is not the same with original token", Assert.assertEquals(
anotherToken.getOwner(), owner); "owner from proto is not the same with original token",
owner, anotherToken.getOwner());
Assert.assertEquals("renewer from proto is not the same with original token", Assert.assertEquals(
anotherToken.getRenewer(), renewer); "renewer from proto is not the same with original token",
renewer, anotherToken.getRenewer());
Assert.assertEquals("realUser from proto is not the same with original token", Assert.assertEquals(
anotherToken.getRealUser(), realUser); "realUser from proto is not the same with original token",
realUser, anotherToken.getRealUser());
Assert.assertEquals("issueDate from proto is not the same with original token", Assert.assertEquals(
anotherToken.getIssueDate(), issueDate); "issueDate from proto is not the same with original token",
issueDate, anotherToken.getIssueDate());
Assert.assertEquals("maxDate from proto is not the same with original token", Assert.assertEquals(
anotherToken.getMaxDate(), maxDate); "maxDate from proto is not the same with original token",
maxDate, anotherToken.getMaxDate());
Assert.assertEquals("sequenceNumber from proto is not the same with original token", Assert.assertEquals(
anotherToken.getSequenceNumber(), sequenceNumber); "sequenceNumber from proto is not the same with original token",
sequenceNumber, anotherToken.getSequenceNumber());
Assert.assertEquals("masterKeyId from proto is not the same with original token", Assert.assertEquals(
anotherToken.getMasterKeyId(), masterKeyId); "masterKeyId from proto is not the same with original token",
masterKeyId, anotherToken.getMasterKeyId());
// Test getProto // Test getProto
RMDelegationTokenIdentifier token1 = YARNDelegationTokenIdentifierProto tokenProto = originalToken.getProto();
new RMDelegationTokenIdentifier(owner, renewer, realUser);
token1.setIssueDate(issueDate);
token1.setMaxDate(maxDate);
token1.setSequenceNumber(sequenceNumber);
token1.setMasterKeyId(masterKeyId);
YARNDelegationTokenIdentifierProto tokenProto = token1.getProto();
// Write token proto to stream // Write token proto to stream
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos); DataOutputStream out = new DataOutputStream(baos);
tokenProto.writeTo(out); tokenProto.writeTo(out);
// Read token // Read token
byte[] tokenData = baos.toByteArray(); byte[] tokenData = baos.toByteArray();
RMDelegationTokenIdentifier readToken = new RMDelegationTokenIdentifier(); RMDelegationTokenIdentifier readToken = new RMDelegationTokenIdentifier();
@ -287,7 +302,7 @@ public class TestYARNTokenIdentifier {
// Verify if read token equals with original token // Verify if read token equals with original token
Assert.assertEquals("Token from getProto is not the same after " + Assert.assertEquals("Token from getProto is not the same after " +
"serialization and deserialization.", token1, readToken); "serialization and deserialization.", originalToken, readToken);
db.close(); db.close();
out.close(); out.close();
} }

View File

@ -383,12 +383,10 @@ public class FileSystemRMStateStore extends RMStateStore {
} }
} else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
RMDelegationTokenIdentifierData identifierData = RMDelegationTokenIdentifierData identifierData =
new RMDelegationTokenIdentifierData(); RMStateStoreUtils.readRMDelegationTokenIdentifierData(fsIn);
identifierData.readFields(fsIn);
RMDelegationTokenIdentifier identifier = RMDelegationTokenIdentifier identifier =
identifierData.getTokenIdentifier(); identifierData.getTokenIdentifier();
long renewDate = identifierData.getRenewDate(); long renewDate = identifierData.getRenewDate();
rmState.rmSecretManagerState.delegationTokenState.put(identifier, rmState.rmSecretManagerState.delegationTokenState.put(identifier,
renewDate); renewDate);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {

View File

@ -410,11 +410,10 @@ public class LeveldbRMStateStore extends RMStateStore {
private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data) private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data)
throws IOException { throws IOException {
RMDelegationTokenIdentifierData tokenData = RMDelegationTokenIdentifierData tokenData = null;
new RMDelegationTokenIdentifierData();
DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
try { try {
tokenData.readFields(in); tokenData = RMStateStoreUtils.readRMDelegationTokenIdentifierData(in);
} finally { } finally {
IOUtils.cleanup(LOG, in); IOUtils.cleanup(LOG, in);
} }

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData;
import java.io.DataInputStream;
import java.io.IOException;
/**
* Utility methods for {@link RMStateStore} and subclasses.
*/
@Private
@Unstable
public class RMStateStoreUtils {
public static final Log LOG = LogFactory.getLog(RMStateStoreUtils.class);
/**
* Returns the RM Delegation Token data from the {@link DataInputStream} as a
* {@link RMDelegationTokenIdentifierData}. It can handle both the current
* and old (non-protobuf) formats.
*
* @param fsIn The {@link DataInputStream} containing RM Delegation Token data
* @return An {@link RMDelegationTokenIdentifierData} containing the read in
* RM Delegation Token
*/
public static RMDelegationTokenIdentifierData
readRMDelegationTokenIdentifierData(DataInputStream fsIn)
throws IOException {
RMDelegationTokenIdentifierData identifierData =
new RMDelegationTokenIdentifierData();
try {
identifierData.readFields(fsIn);
} catch (InvalidProtocolBufferException e) {
LOG.warn("Recovering old formatted token");
fsIn.reset();
YARNDelegationTokenIdentifier identifier =
new RMDelegationTokenIdentifier();
identifier.readFieldsInOldFormat(fsIn);
identifierData.setIdentifier(identifier);
identifierData.setRenewDate(fsIn.readLong());
}
return identifierData;
}
}

View File

@ -669,8 +669,7 @@ public class ZKRMStateStore extends RMStateStore {
ByteArrayInputStream is = new ByteArrayInputStream(data); ByteArrayInputStream is = new ByteArrayInputStream(data);
try (DataInputStream fsIn = new DataInputStream(is)) { try (DataInputStream fsIn = new DataInputStream(is)) {
RMDelegationTokenIdentifierData identifierData = RMDelegationTokenIdentifierData identifierData =
new RMDelegationTokenIdentifierData(); RMStateStoreUtils.readRMDelegationTokenIdentifierData(fsIn);
identifierData.readFields(fsIn);
RMDelegationTokenIdentifier identifier = RMDelegationTokenIdentifier identifier =
identifierData.getTokenIdentifier(); identifierData.getTokenIdentifier();
long renewDate = identifierData.getRenewDate(); long renewDate = identifierData.getRenewDate();

View File

@ -58,4 +58,12 @@ public class RMDelegationTokenIdentifierData {
public long getRenewDate() { public long getRenewDate() {
return builder.getRenewDate(); return builder.getRenewDate();
} }
public void setIdentifier(YARNDelegationTokenIdentifier identifier) {
builder.setTokenIdentifier(identifier.getProto());
}
public void setRenewDate(long renewDate) {
builder.setRenewDate(renewDate);
}
} }

View File

@ -36,7 +36,11 @@ import java.net.InetSocketAddress;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.delegation.TestDelegationToken;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -352,6 +356,46 @@ public class TestClientRMTokens {
false); false);
} }
@Test
public void testReadOldFormatFields() throws IOException {
RMDelegationTokenIdentifier token = new RMDelegationTokenIdentifier(
new Text("alice"), new Text("bob"), new Text("colin"));
token.setIssueDate(123);
token.setMasterKeyId(321);
token.setMaxDate(314);
token.setSequenceNumber(12345);
DataInputBuffer inBuf = new DataInputBuffer();
DataOutputBuffer outBuf = new DataOutputBuffer();
token.writeInOldFormat(outBuf);
outBuf.writeLong(42); // renewDate
inBuf.reset(outBuf.getData(), 0, outBuf.getLength());
RMDelegationTokenIdentifier identifier = null;
try {
RMDelegationTokenIdentifierData identifierData =
new RMDelegationTokenIdentifierData();
identifierData.readFields(inBuf);
fail("Should have thrown a "
+ InvalidProtocolBufferException.class.getName()
+ " because the token is not a protobuf");
} catch (InvalidProtocolBufferException e) {
identifier = new RMDelegationTokenIdentifier();
inBuf.reset();
identifier.readFieldsInOldFormat(inBuf);
assertEquals(42, inBuf.readLong());
}
assertEquals("alice", identifier.getUser().getUserName());
assertEquals(new Text("bob"), identifier.getRenewer());
assertEquals("colin", identifier.getUser().getRealUser().getUserName());
assertEquals(123, identifier.getIssueDate());
assertEquals(321, identifier.getMasterKeyId());
assertEquals(314, identifier.getMaxDate());
assertEquals(12345, identifier.getSequenceNumber());
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void checkShortCircuitRenewCancel(InetSocketAddress rmAddr, private void checkShortCircuitRenewCancel(InetSocketAddress rmAddr,
InetSocketAddress serviceAddr, InetSocketAddress serviceAddr,

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class TestRMStateStoreUtils {
@Test
public void testReadRMDelegationTokenIdentifierData()
throws Exception {
testReadRMDelegationTokenIdentifierData(false);
}
@Test
public void testReadRMDelegationTokenIdentifierDataOldFormat()
throws Exception {
testReadRMDelegationTokenIdentifierData(true);
}
public void testReadRMDelegationTokenIdentifierData(boolean oldFormat)
throws Exception {
RMDelegationTokenIdentifier token = new RMDelegationTokenIdentifier(
new Text("alice"), new Text("bob"), new Text("colin"));
token.setIssueDate(123);
token.setMasterKeyId(321);
token.setMaxDate(314);
token.setSequenceNumber(12345);
DataInputBuffer inBuf = new DataInputBuffer();
if (oldFormat) {
DataOutputBuffer outBuf = new DataOutputBuffer();
token.writeInOldFormat(outBuf);
outBuf.writeLong(42); // renewDate
inBuf.reset(outBuf.getData(), 0, outBuf.getLength());
} else {
RMDelegationTokenIdentifierData tokenIdentifierData
= new RMDelegationTokenIdentifierData(token, 42);
byte[] data = tokenIdentifierData.toByteArray();
inBuf.reset(data, 0, data.length);
}
RMDelegationTokenIdentifierData identifierData
= RMStateStoreUtils.readRMDelegationTokenIdentifierData(inBuf);
assertEquals("Found unexpected data still in the InputStream",
-1, inBuf.read());
RMDelegationTokenIdentifier identifier
= identifierData.getTokenIdentifier();
assertEquals("alice", identifier.getUser().getUserName());
assertEquals(new Text("bob"), identifier.getRenewer());
assertEquals("colin", identifier.getUser().getRealUser().getUserName());
assertEquals(123, identifier.getIssueDate());
assertEquals(321, identifier.getMasterKeyId());
assertEquals(314, identifier.getMaxDate());
assertEquals(12345, identifier.getSequenceNumber());
assertEquals(42, identifierData.getRenewDate());
}
}