YARN-5594. Handle old RMDelegationToken format when recovering RM (rkanter)

This commit is contained in:
Robert Kanter 2017-12-04 13:14:55 -08:00
parent d30d57828f
commit d8863fc16f
9 changed files with 282 additions and 58 deletions

View File

@ -22,6 +22,7 @@ import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
@ -64,6 +65,11 @@ public abstract class YARNDelegationTokenIdentifier extends
setMasterKeyId(builder.getMasterKeyId());
}
public synchronized void readFieldsInOldFormat(DataInput in)
throws IOException {
super.readFields(in);
}
private void setBuilderFields() {
if (builder.getOwner() != null &&
!builder.getOwner().equals(getOwner().toString())) {
@ -97,6 +103,11 @@ public abstract class YARNDelegationTokenIdentifier extends
builder.build().writeTo((DataOutputStream) out);
}
@VisibleForTesting
public synchronized void writeInOldFormat(DataOutput out) throws IOException {
super.write(out);
}
public YARNDelegationTokenIdentifierProto getProto() {
setBuilderFields();
return builder.build();

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.HadoopKerberosName;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -214,10 +215,20 @@ public class TestYARNTokenIdentifier {
Assert.assertEquals(ExecutionType.GUARANTEED,
anotherToken.getExecutionType());
}
@Test
public void testRMDelegationTokenIdentifier() throws IOException {
testRMDelegationTokenIdentifier(false);
}
@Test
public void testRMDelegationTokenIdentifierOldFormat() throws IOException {
testRMDelegationTokenIdentifier(true);
}
public void testRMDelegationTokenIdentifier(boolean oldFormat)
throws IOException {
Text owner = new Text("user1");
Text renewer = new Text("user2");
Text realUser = new Text("user3");
@ -225,59 +236,63 @@ public class TestYARNTokenIdentifier {
long maxDate = 2;
int sequenceNumber = 3;
int masterKeyId = 4;
RMDelegationTokenIdentifier token =
RMDelegationTokenIdentifier originalToken =
new RMDelegationTokenIdentifier(owner, renewer, realUser);
token.setIssueDate(issueDate);
token.setMaxDate(maxDate);
token.setSequenceNumber(sequenceNumber);
token.setMasterKeyId(masterKeyId);
RMDelegationTokenIdentifier anotherToken = new RMDelegationTokenIdentifier();
byte[] tokenContent = token.getBytes();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenContent, tokenContent.length);
anotherToken.readFields(dib);
dib.close();
originalToken.setIssueDate(issueDate);
originalToken.setMaxDate(maxDate);
originalToken.setSequenceNumber(sequenceNumber);
originalToken.setMasterKeyId(masterKeyId);
RMDelegationTokenIdentifier anotherToken
= new RMDelegationTokenIdentifier();
if (oldFormat) {
DataInputBuffer inBuf = new DataInputBuffer();
DataOutputBuffer outBuf = new DataOutputBuffer();
originalToken.writeInOldFormat(outBuf);
inBuf.reset(outBuf.getData(), 0, outBuf.getLength());
anotherToken.readFieldsInOldFormat(inBuf);
inBuf.close();
} else {
byte[] tokenContent = originalToken.getBytes();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenContent, tokenContent.length);
anotherToken.readFields(dib);
dib.close();
}
// verify the whole record equals with original record
Assert.assertEquals("Token is not the same after serialization " +
"and deserialization.", token, anotherToken);
Assert.assertEquals("owner from proto is not the same with original token",
anotherToken.getOwner(), owner);
Assert.assertEquals("renewer from proto is not the same with original token",
anotherToken.getRenewer(), renewer);
Assert.assertEquals("realUser from proto is not the same with original token",
anotherToken.getRealUser(), realUser);
Assert.assertEquals("issueDate from proto is not the same with original token",
anotherToken.getIssueDate(), issueDate);
Assert.assertEquals("maxDate from proto is not the same with original token",
anotherToken.getMaxDate(), maxDate);
Assert.assertEquals("sequenceNumber from proto is not the same with original token",
anotherToken.getSequenceNumber(), sequenceNumber);
Assert.assertEquals("masterKeyId from proto is not the same with original token",
anotherToken.getMasterKeyId(), masterKeyId);
// Test getProto
RMDelegationTokenIdentifier token1 =
new RMDelegationTokenIdentifier(owner, renewer, realUser);
token1.setIssueDate(issueDate);
token1.setMaxDate(maxDate);
token1.setSequenceNumber(sequenceNumber);
token1.setMasterKeyId(masterKeyId);
YARNDelegationTokenIdentifierProto tokenProto = token1.getProto();
Assert.assertEquals(
"Token is not the same after serialization and deserialization.",
originalToken, anotherToken);
Assert.assertEquals(
"owner from proto is not the same with original token",
owner, anotherToken.getOwner());
Assert.assertEquals(
"renewer from proto is not the same with original token",
renewer, anotherToken.getRenewer());
Assert.assertEquals(
"realUser from proto is not the same with original token",
realUser, anotherToken.getRealUser());
Assert.assertEquals(
"issueDate from proto is not the same with original token",
issueDate, anotherToken.getIssueDate());
Assert.assertEquals(
"maxDate from proto is not the same with original token",
maxDate, anotherToken.getMaxDate());
Assert.assertEquals(
"sequenceNumber from proto is not the same with original token",
sequenceNumber, anotherToken.getSequenceNumber());
Assert.assertEquals(
"masterKeyId from proto is not the same with original token",
masterKeyId, anotherToken.getMasterKeyId());
// Test getProto
YARNDelegationTokenIdentifierProto tokenProto = originalToken.getProto();
// Write token proto to stream
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos);
tokenProto.writeTo(out);
// Read token
byte[] tokenData = baos.toByteArray();
RMDelegationTokenIdentifier readToken = new RMDelegationTokenIdentifier();
@ -287,7 +302,7 @@ public class TestYARNTokenIdentifier {
// Verify if read token equals with original token
Assert.assertEquals("Token from getProto is not the same after " +
"serialization and deserialization.", token1, readToken);
"serialization and deserialization.", originalToken, readToken);
db.close();
out.close();
}

View File

@ -378,12 +378,10 @@ public class FileSystemRMStateStore extends RMStateStore {
}
} else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
RMDelegationTokenIdentifierData identifierData =
new RMDelegationTokenIdentifierData();
identifierData.readFields(fsIn);
RMStateStoreUtils.readRMDelegationTokenIdentifierData(fsIn);
RMDelegationTokenIdentifier identifier =
identifierData.getTokenIdentifier();
long renewDate = identifierData.getRenewDate();
rmState.rmSecretManagerState.delegationTokenState.put(identifier,
renewDate);
if (LOG.isDebugEnabled()) {

View File

@ -410,11 +410,10 @@ public class LeveldbRMStateStore extends RMStateStore {
private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data)
throws IOException {
RMDelegationTokenIdentifierData tokenData =
new RMDelegationTokenIdentifierData();
RMDelegationTokenIdentifierData tokenData = null;
DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
try {
tokenData.readFields(in);
tokenData = RMStateStoreUtils.readRMDelegationTokenIdentifierData(in);
} finally {
IOUtils.cleanup(LOG, in);
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData;
import java.io.DataInputStream;
import java.io.IOException;
/**
* Utility methods for {@link RMStateStore} and subclasses.
*/
@Private
@Unstable
public class RMStateStoreUtils {
public static final Log LOG = LogFactory.getLog(RMStateStoreUtils.class);
/**
* Returns the RM Delegation Token data from the {@link DataInputStream} as a
* {@link RMDelegationTokenIdentifierData}. It can handle both the current
* and old (non-protobuf) formats.
*
* @param fsIn The {@link DataInputStream} containing RM Delegation Token data
* @return An {@link RMDelegationTokenIdentifierData} containing the read in
* RM Delegation Token
*/
public static RMDelegationTokenIdentifierData
readRMDelegationTokenIdentifierData(DataInputStream fsIn)
throws IOException {
RMDelegationTokenIdentifierData identifierData =
new RMDelegationTokenIdentifierData();
try {
identifierData.readFields(fsIn);
} catch (InvalidProtocolBufferException e) {
LOG.warn("Recovering old formatted token");
fsIn.reset();
YARNDelegationTokenIdentifier identifier =
new RMDelegationTokenIdentifier();
identifier.readFieldsInOldFormat(fsIn);
identifierData.setIdentifier(identifier);
identifierData.setRenewDate(fsIn.readLong());
}
return identifierData;
}
}

View File

@ -668,8 +668,7 @@ public class ZKRMStateStore extends RMStateStore {
ByteArrayInputStream is = new ByteArrayInputStream(data);
try (DataInputStream fsIn = new DataInputStream(is)) {
RMDelegationTokenIdentifierData identifierData =
new RMDelegationTokenIdentifierData();
identifierData.readFields(fsIn);
RMStateStoreUtils.readRMDelegationTokenIdentifierData(fsIn);
RMDelegationTokenIdentifier identifier =
identifierData.getTokenIdentifier();
long renewDate = identifierData.getRenewDate();

View File

@ -58,4 +58,12 @@ public class RMDelegationTokenIdentifierData {
public long getRenewDate() {
return builder.getRenewDate();
}
public void setIdentifier(YARNDelegationTokenIdentifier identifier) {
builder.setTokenIdentifier(identifier.getProto());
}
public void setRenewDate(long renewDate) {
builder.setRenewDate(renewDate);
}
}

View File

@ -36,7 +36,11 @@ import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.delegation.TestDelegationToken;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData;
import org.junit.AfterClass;
import org.junit.Assert;
@ -352,6 +356,46 @@ public class TestClientRMTokens {
false);
}
@Test
public void testReadOldFormatFields() throws IOException {
RMDelegationTokenIdentifier token = new RMDelegationTokenIdentifier(
new Text("alice"), new Text("bob"), new Text("colin"));
token.setIssueDate(123);
token.setMasterKeyId(321);
token.setMaxDate(314);
token.setSequenceNumber(12345);
DataInputBuffer inBuf = new DataInputBuffer();
DataOutputBuffer outBuf = new DataOutputBuffer();
token.writeInOldFormat(outBuf);
outBuf.writeLong(42); // renewDate
inBuf.reset(outBuf.getData(), 0, outBuf.getLength());
RMDelegationTokenIdentifier identifier = null;
try {
RMDelegationTokenIdentifierData identifierData =
new RMDelegationTokenIdentifierData();
identifierData.readFields(inBuf);
fail("Should have thrown a "
+ InvalidProtocolBufferException.class.getName()
+ " because the token is not a protobuf");
} catch (InvalidProtocolBufferException e) {
identifier = new RMDelegationTokenIdentifier();
inBuf.reset();
identifier.readFieldsInOldFormat(inBuf);
assertEquals(42, inBuf.readLong());
}
assertEquals("alice", identifier.getUser().getUserName());
assertEquals(new Text("bob"), identifier.getRenewer());
assertEquals("colin", identifier.getUser().getRealUser().getUserName());
assertEquals(123, identifier.getIssueDate());
assertEquals(321, identifier.getMasterKeyId());
assertEquals(314, identifier.getMaxDate());
assertEquals(12345, identifier.getSequenceNumber());
}
@SuppressWarnings("unchecked")
private void checkShortCircuitRenewCancel(InetSocketAddress rmAddr,
InetSocketAddress serviceAddr,

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class TestRMStateStoreUtils {
@Test
public void testReadRMDelegationTokenIdentifierData()
throws Exception {
testReadRMDelegationTokenIdentifierData(false);
}
@Test
public void testReadRMDelegationTokenIdentifierDataOldFormat()
throws Exception {
testReadRMDelegationTokenIdentifierData(true);
}
public void testReadRMDelegationTokenIdentifierData(boolean oldFormat)
throws Exception {
RMDelegationTokenIdentifier token = new RMDelegationTokenIdentifier(
new Text("alice"), new Text("bob"), new Text("colin"));
token.setIssueDate(123);
token.setMasterKeyId(321);
token.setMaxDate(314);
token.setSequenceNumber(12345);
DataInputBuffer inBuf = new DataInputBuffer();
if (oldFormat) {
DataOutputBuffer outBuf = new DataOutputBuffer();
token.writeInOldFormat(outBuf);
outBuf.writeLong(42); // renewDate
inBuf.reset(outBuf.getData(), 0, outBuf.getLength());
} else {
RMDelegationTokenIdentifierData tokenIdentifierData
= new RMDelegationTokenIdentifierData(token, 42);
byte[] data = tokenIdentifierData.toByteArray();
inBuf.reset(data, 0, data.length);
}
RMDelegationTokenIdentifierData identifierData
= RMStateStoreUtils.readRMDelegationTokenIdentifierData(inBuf);
assertEquals("Found unexpected data still in the InputStream",
-1, inBuf.read());
RMDelegationTokenIdentifier identifier
= identifierData.getTokenIdentifier();
assertEquals("alice", identifier.getUser().getUserName());
assertEquals(new Text("bob"), identifier.getRenewer());
assertEquals("colin", identifier.getUser().getRealUser().getUserName());
assertEquals(123, identifier.getIssueDate());
assertEquals(321, identifier.getMasterKeyId());
assertEquals(314, identifier.getMaxDate());
assertEquals(12345, identifier.getSequenceNumber());
assertEquals(42, identifierData.getRenewDate());
}
}