YARN-11294. [Federation] Router Support DelegationToken store/update/remove Token With MemoryStateStore. (#4915)

This commit is contained in:
slfan1989 2022-10-14 07:52:22 +08:00 committed by GitHub
parent 647457e6ab
commit 1962851356
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1352 additions and 6 deletions

View File

@ -24,9 +24,11 @@ import java.io.IOException;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.YARNDelegationTokenIdentifierProto;
import org.apache.hadoop.yarn.util.Records;
@Private
public abstract class YARNDelegationTokenIdentifier extends
@ -112,4 +114,14 @@ public abstract class YARNDelegationTokenIdentifier extends
setBuilderFields();
return builder.build();
}
@Private
@Unstable
public static YARNDelegationTokenIdentifier newInstance(Text owner, Text renewer, Text realUser) {
YARNDelegationTokenIdentifier policy = Records.newRecord(YARNDelegationTokenIdentifier.class);
policy.setOwner(owner);
policy.setRenewer(renewer);
policy.setRenewer(realUser);
return policy;
}
}

View File

@ -0,0 +1,200 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.security.client.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.YARNDelegationTokenIdentifierProto;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.YARNDelegationTokenIdentifierProtoOrBuilder;
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
@Private
@Unstable
public class YARNDelegationTokenIdentifierPBImpl extends YARNDelegationTokenIdentifier {
private YARNDelegationTokenIdentifierProto proto =
YARNDelegationTokenIdentifierProto.getDefaultInstance();
private YARNDelegationTokenIdentifierProto.Builder builder = null;
private boolean viaProto = false;
public YARNDelegationTokenIdentifierPBImpl() {
builder = YARNDelegationTokenIdentifierProto.newBuilder();
}
public YARNDelegationTokenIdentifierPBImpl(YARNDelegationTokenIdentifierProto identifierProto) {
this.proto = identifierProto;
viaProto = true;
}
public YARNDelegationTokenIdentifierProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
// mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
if (proto == null) {
proto = YARNDelegationTokenIdentifierProto.getDefaultInstance();
}
builder = YARNDelegationTokenIdentifierProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public Text getOwner() {
YARNDelegationTokenIdentifierProtoOrBuilder p = viaProto ? proto : builder;
return new Text(p.getOwner());
}
@Override
public void setOwner(Text owner) {
super.setOwner(owner);
maybeInitBuilder();
if (owner == null) {
builder.clearOwner();
return;
}
builder.setOwner(owner.toString());
}
@Override
public Text getRenewer() {
YARNDelegationTokenIdentifierProtoOrBuilder p = viaProto ? proto : builder;
return new Text(p.getRenewer());
}
@Override
public void setRenewer(Text renewer) {
super.setRenewer(renewer);
maybeInitBuilder();
if (renewer == null) {
builder.clearRenewer();
return;
}
builder.setOwner(renewer.toString());
}
@Override
public Text getRealUser() {
YARNDelegationTokenIdentifierProtoOrBuilder p = viaProto ? proto : builder;
return new Text(p.getRealUser());
}
@Override
public void setRealUser(Text realUser) {
super.setRealUser(realUser);
maybeInitBuilder();
if (realUser == null) {
builder.clearRealUser();
return;
}
builder.setRealUser(realUser.toString());
}
@Override
public void setIssueDate(long issueDate) {
super.setIssueDate(issueDate);
maybeInitBuilder();
builder.setIssueDate(issueDate);
}
@Override
public long getIssueDate() {
YARNDelegationTokenIdentifierProtoOrBuilder p = viaProto ? proto : builder;
return p.getIssueDate();
}
@Override
public void setMaxDate(long maxDate) {
super.setMaxDate(maxDate);
maybeInitBuilder();
builder.setMaxDate(maxDate);
}
@Override
public long getMaxDate() {
YARNDelegationTokenIdentifierProtoOrBuilder p = viaProto ? proto : builder;
return p.getMaxDate();
}
@Override
public void setSequenceNumber(int seqNum) {
super.setSequenceNumber(seqNum);
maybeInitBuilder();
builder.setSequenceNumber(seqNum);
}
@Override
public int getSequenceNumber() {
YARNDelegationTokenIdentifierProtoOrBuilder p = viaProto ? proto : builder;
return p.getSequenceNumber();
}
@Override
public void setMasterKeyId(int newId) {
super.setMasterKeyId(newId);
maybeInitBuilder();
builder.setMasterKeyId(newId);
}
@Override
public int getMasterKeyId() {
YARNDelegationTokenIdentifierProtoOrBuilder p = viaProto ? proto : builder;
return p.getMasterKeyId();
}
@Override
public Text getKind() {
return null;
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
}

View File

@ -0,0 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@Public
package org.apache.hadoop.yarn.security.client.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Public;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.api;
import org.apache.commons.lang3.Range;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
@ -80,6 +81,8 @@ public class BasePBImplRecordsTest {
'a' + rand.nextInt(26));
} else if (type.equals(Float.class)) {
return rand.nextFloat();
} else if (type.equals(Text.class)) {
return new Text('a' + String.valueOf(rand.nextInt(1000000)));
} else if (type instanceof Class) {
Class clazz = (Class)type;
if (clazz.isArray()) {
@ -167,7 +170,7 @@ public class BasePBImplRecordsTest {
" does not have newInstance method");
}
Object [] args = new Object[paramTypes.length];
for (int i=0;i<args.length;i++) {
for (int i = 0; i < args.length; i++) {
args[i] = genTypeValue(paramTypes[i]);
}
ret = newInstance.invoke(null, args);

View File

@ -21,6 +21,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
import java.io.IOException;
@ -66,4 +68,48 @@ public interface FederationDelegationTokenStateStore {
*/
RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
throws YarnException, IOException;
/**
* The Router Supports Store RMDelegationTokenIdentifier.
*
* @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
* @return routerRMTokenResponse.
* @throws YarnException if the call to the state store is unsuccessful
* @throws IOException An IO Error occurred
*/
RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
throws YarnException, IOException;
/**
* The Router Supports Update RMDelegationTokenIdentifier.
*
* @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
* @return RouterRMTokenResponse.
* @throws YarnException if the call to the state store is unsuccessful
* @throws IOException An IO Error occurred
*/
RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException;
/**
* The Router Supports Remove RMDelegationTokenIdentifier.
*
* @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
* @return RouterRMTokenResponse.
* @throws YarnException if the call to the state store is unsuccessful
* @throws IOException An IO Error occurred
*/
RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException;
/**
* The Router Supports GetTokenByRouterStoreToken.
*
* @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
* @return RouterRMTokenResponse.
* @throws YarnException if the call to the state store is unsuccessful
* @throws IOException An IO Error occurred
*/
RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request)
throws YarnException, IOException;
}

View File

@ -33,6 +33,7 @@ import java.util.Comparator;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -83,6 +84,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretManagerState;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
@ -479,6 +483,71 @@ public class MemoryFederationStateStore implements FederationStateStore {
return RouterMasterKeyResponse.newInstance(resultRouterMasterKey);
}
@Override
public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
throws YarnException, IOException {
RouterStoreToken storeToken = request.getRouterStoreToken();
RMDelegationTokenIdentifier tokenIdentifier =
(RMDelegationTokenIdentifier) storeToken.getTokenIdentifier();
Long renewDate = storeToken.getRenewDate();
storeOrUpdateRouterRMDT(tokenIdentifier, renewDate, false);
return RouterRMTokenResponse.newInstance(storeToken);
}
@Override
public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException {
RouterStoreToken storeToken = request.getRouterStoreToken();
RMDelegationTokenIdentifier tokenIdentifier =
(RMDelegationTokenIdentifier) storeToken.getTokenIdentifier();
Long renewDate = storeToken.getRenewDate();
Map<RMDelegationTokenIdentifier, Long> rmDTState = routerRMSecretManagerState.getTokenState();
rmDTState.remove(tokenIdentifier);
storeOrUpdateRouterRMDT(tokenIdentifier, renewDate, true);
return RouterRMTokenResponse.newInstance(storeToken);
}
@Override
public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException {
RouterStoreToken storeToken = request.getRouterStoreToken();
RMDelegationTokenIdentifier tokenIdentifier =
(RMDelegationTokenIdentifier) storeToken.getTokenIdentifier();
Map<RMDelegationTokenIdentifier, Long> rmDTState = routerRMSecretManagerState.getTokenState();
rmDTState.remove(tokenIdentifier);
return RouterRMTokenResponse.newInstance(storeToken);
}
@Override
public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request)
throws YarnException, IOException {
RouterStoreToken storeToken = request.getRouterStoreToken();
RMDelegationTokenIdentifier tokenIdentifier =
(RMDelegationTokenIdentifier) storeToken.getTokenIdentifier();
Map<RMDelegationTokenIdentifier, Long> rmDTState = routerRMSecretManagerState.getTokenState();
if (!rmDTState.containsKey(tokenIdentifier)) {
LOG.info("Router RMDelegationToken: {} does not exist.", tokenIdentifier);
throw new IOException("Router RMDelegationToken: " + tokenIdentifier + " does not exist.");
}
RouterStoreToken resultToken =
RouterStoreToken.newInstance(tokenIdentifier, rmDTState.get(tokenIdentifier));
return RouterRMTokenResponse.newInstance(resultToken);
}
private void storeOrUpdateRouterRMDT(RMDelegationTokenIdentifier rmDTIdentifier,
Long renewDate, boolean isUpdate) throws IOException {
Map<RMDelegationTokenIdentifier, Long> rmDTState = routerRMSecretManagerState.getTokenState();
if (rmDTState.containsKey(rmDTIdentifier)) {
LOG.info("Error storing info for RMDelegationToken: {}.", rmDTIdentifier);
throw new IOException("Router RMDelegationToken: " + rmDTIdentifier + "is already stored.");
}
rmDTState.put(rmDTIdentifier, renewDate);
if (!isUpdate) {
routerRMSecretManagerState.setDtSequenceNumber(rmDTIdentifier.getSequenceNumber());
}
LOG.info("Store Router RM-RMDT with sequence number {}.", rmDTIdentifier.getSequenceNumber());
}
/**
* Get DelegationKey By based on MasterKey.
*

View File

@ -84,6 +84,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationH
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
@ -1427,4 +1429,28 @@ public class SQLFederationStateStore implements FederationStateStore {
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
}

View File

@ -82,6 +82,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationH
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl;
@ -901,4 +903,28 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
@Private
@Unstable
public abstract class RouterRMTokenRequest {
@Private
@Unstable
public static RouterRMTokenRequest newInstance(RouterStoreToken routerStoreToken) {
RouterRMTokenRequest request = Records.newRecord(RouterRMTokenRequest.class);
request.setRouterStoreToken(routerStoreToken);
return request;
}
@Public
@Unstable
public abstract RouterStoreToken getRouterStoreToken();
@Private
@Unstable
public abstract void setRouterStoreToken(RouterStoreToken routerStoreToken);
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
@Private
@Unstable
public abstract class RouterRMTokenResponse {
@Private
@Unstable
public static RouterRMTokenResponse newInstance(RouterStoreToken routerStoreToken) {
RouterRMTokenResponse request = Records.newRecord(RouterRMTokenResponse.class);
request.setRouterStoreToken(routerStoreToken);
return request;
}
@Public
@Unstable
public abstract RouterStoreToken getRouterStoreToken();
@Private
@Unstable
public abstract void setRouterStoreToken(RouterStoreToken routerStoreToken);
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.Records;
import java.io.IOException;
@Private
@Unstable
public abstract class RouterStoreToken {
@Private
@Unstable
public static RouterStoreToken newInstance(YARNDelegationTokenIdentifier identifier,
Long renewdate) {
RouterStoreToken storeToken = Records.newRecord(RouterStoreToken.class);
storeToken.setIdentifier(identifier);
storeToken.setRenewDate(renewdate);
return storeToken;
}
@Private
@Unstable
public abstract YARNDelegationTokenIdentifier getTokenIdentifier() throws IOException;
@Private
@Unstable
public abstract void setIdentifier(YARNDelegationTokenIdentifier identifier);
@Private
@Unstable
public abstract Long getRenewDate();
@Private
@Unstable
public abstract void setRenewDate(Long renewDate);
}

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records.impl.pb;
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterRMTokenRequestProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterRMTokenRequestProtoOrBuilder;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterStoreTokenProto;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
public class RouterRMTokenRequestPBImpl extends RouterRMTokenRequest {
private RouterRMTokenRequestProto proto = RouterRMTokenRequestProto.getDefaultInstance();
private RouterRMTokenRequestProto.Builder builder = null;
private boolean viaProto = false;
private RouterStoreToken routerStoreToken = null;
public RouterRMTokenRequestPBImpl() {
builder = RouterRMTokenRequestProto.newBuilder();
}
public RouterRMTokenRequestPBImpl(RouterRMTokenRequestProto requestProto) {
this.proto = requestProto;
viaProto = true;
}
public RouterRMTokenRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = RouterRMTokenRequestProto.newBuilder(proto);
}
viaProto = false;
}
private void mergeLocalToBuilder() {
if (this.routerStoreToken != null) {
RouterStoreTokenPBImpl routerStoreTokenPBImpl =
(RouterStoreTokenPBImpl) this.routerStoreToken;
RouterStoreTokenProto storeTokenProto = routerStoreTokenPBImpl.getProto();
if (!storeTokenProto.equals(builder.getRouterStoreToken())) {
builder.setRouterStoreToken(convertToProtoFormat(this.routerStoreToken));
}
}
}
@Override
public RouterStoreToken getRouterStoreToken() {
RouterRMTokenRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.routerStoreToken != null) {
return this.routerStoreToken;
}
if (!p.hasRouterStoreToken()) {
return null;
}
this.routerStoreToken = convertFromProtoFormat(p.getRouterStoreToken());
return this.routerStoreToken;
}
@Override
public void setRouterStoreToken(RouterStoreToken storeToken) {
maybeInitBuilder();
if (storeToken == null) {
builder.clearRouterStoreToken();
return;
}
this.routerStoreToken = storeToken;
this.builder.setRouterStoreToken(convertToProtoFormat(storeToken));
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
private RouterStoreTokenProto convertToProtoFormat(RouterStoreToken storeToken) {
return ((RouterStoreTokenPBImpl) storeToken).getProto();
}
private RouterStoreToken convertFromProtoFormat(RouterStoreTokenProto storeTokenProto) {
return new RouterStoreTokenPBImpl(storeTokenProto);
}
}

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterRMTokenResponseProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterRMTokenResponseProtoOrBuilder;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterStoreTokenProto;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
@Private
@Unstable
public class RouterRMTokenResponsePBImpl extends RouterRMTokenResponse {
private RouterRMTokenResponseProto proto = RouterRMTokenResponseProto.getDefaultInstance();
private RouterRMTokenResponseProto.Builder builder = null;
private boolean viaProto = false;
private RouterStoreToken routerStoreToken = null;
public RouterRMTokenResponsePBImpl() {
builder = RouterRMTokenResponseProto.newBuilder();
}
public RouterRMTokenResponsePBImpl(RouterRMTokenResponseProto requestProto) {
this.proto = requestProto;
viaProto = true;
}
public RouterRMTokenResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = RouterRMTokenResponseProto.newBuilder(proto);
}
viaProto = false;
}
private void mergeLocalToBuilder() {
if (this.routerStoreToken != null) {
RouterStoreTokenPBImpl routerStoreTokenPBImpl =
(RouterStoreTokenPBImpl) this.routerStoreToken;
RouterStoreTokenProto storeTokenProto = routerStoreTokenPBImpl.getProto();
if (!storeTokenProto.equals(builder.getRouterStoreToken())) {
builder.setRouterStoreToken(convertToProtoFormat(this.routerStoreToken));
}
}
}
@Override
public RouterStoreToken getRouterStoreToken() {
RouterRMTokenResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.routerStoreToken != null) {
return this.routerStoreToken;
}
if (!p.hasRouterStoreToken()) {
return null;
}
this.routerStoreToken = convertFromProtoFormat(p.getRouterStoreToken());
return this.routerStoreToken;
}
@Override
public void setRouterStoreToken(RouterStoreToken storeToken) {
maybeInitBuilder();
if (storeToken == null) {
builder.clearRouterStoreToken();
}
this.routerStoreToken = storeToken;
}
private RouterStoreTokenProto convertToProtoFormat(RouterStoreToken storeToken) {
return ((RouterStoreTokenPBImpl) storeToken).getProto();
}
private RouterStoreToken convertFromProtoFormat(RouterStoreTokenProto storeTokenProto) {
return new RouterStoreTokenPBImpl(storeTokenProto);
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.YARNDelegationTokenIdentifierProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterStoreTokenProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterStoreTokenProtoOrBuilder;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
/**
* Protocol buffer based implementation of {@link RouterStoreToken}.
*/
@Private
@Unstable
public class RouterStoreTokenPBImpl extends RouterStoreToken {
private RouterStoreTokenProto proto = RouterStoreTokenProto.getDefaultInstance();
private RouterStoreTokenProto.Builder builder = null;
private boolean viaProto = false;
private YARNDelegationTokenIdentifier rMDelegationTokenIdentifier = null;
private Long renewDate;
public RouterStoreTokenPBImpl() {
builder = RouterStoreTokenProto.newBuilder();
}
public RouterStoreTokenPBImpl(RouterStoreTokenProto storeTokenProto) {
this.proto = storeTokenProto;
viaProto = true;
}
public RouterStoreTokenProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void mergeLocalToBuilder() {
if (this.rMDelegationTokenIdentifier != null) {
YARNDelegationTokenIdentifierProto idProto = this.rMDelegationTokenIdentifier.getProto();
if (!idProto.equals(builder.getTokenIdentifier())) {
builder.setTokenIdentifier(convertToProtoFormat(this.rMDelegationTokenIdentifier));
}
}
if (this.renewDate != null) {
builder.setRenewDate(this.renewDate);
}
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = RouterStoreTokenProto.newBuilder(proto);
}
viaProto = false;
}
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
@Override
public YARNDelegationTokenIdentifier getTokenIdentifier() throws IOException {
RouterStoreTokenProtoOrBuilder p = viaProto ? proto : builder;
if (rMDelegationTokenIdentifier != null) {
return rMDelegationTokenIdentifier;
}
if(!p.hasTokenIdentifier()){
return null;
}
YARNDelegationTokenIdentifierProto identifierProto = p.getTokenIdentifier();
ByteArrayInputStream in = new ByteArrayInputStream(identifierProto.toByteArray());
RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier();
identifier.readFields(new DataInputStream(in));
this.rMDelegationTokenIdentifier = identifier;
return identifier;
}
@Override
public Long getRenewDate() {
RouterStoreTokenProtoOrBuilder p = viaProto ? proto : builder;
if (this.renewDate != null) {
return this.renewDate;
}
if (!p.hasRenewDate()) {
return null;
}
this.renewDate = p.getRenewDate();
return this.renewDate;
}
@Override
public void setIdentifier(YARNDelegationTokenIdentifier identifier) {
maybeInitBuilder();
if(identifier == null) {
builder.clearTokenIdentifier();
return;
}
this.rMDelegationTokenIdentifier = identifier;
this.builder.setTokenIdentifier(identifier.getProto());
}
@Override
public void setRenewDate(Long renewDate) {
maybeInitBuilder();
if(renewDate == null) {
builder.clearRenewDate();
return;
}
this.renewDate = renewDate;
this.builder.setRenewDate(renewDate);
}
private YARNDelegationTokenIdentifierProto convertToProtoFormat(
YARNDelegationTokenIdentifier delegationTokenIdentifier) {
return delegationTokenIdentifier.getProto();
}
}

View File

@ -80,6 +80,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationH
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -778,4 +782,71 @@ public final class FederationStateStoreFacade {
RouterMasterKeyRequest keyRequest = RouterMasterKeyRequest.newInstance(masterKey);
return stateStore.getMasterKeyByDelegationKey(keyRequest);
}
/**
* The Router Supports Store RMDelegationTokenIdentifier{@link RMDelegationTokenIdentifier}.
*
* @param identifier delegation tokens from the RM
* @param renewDate renewDate
* @throws YarnException if the call to the state store is unsuccessful
* @throws IOException An IO Error occurred
*/
public void storeNewToken(RMDelegationTokenIdentifier identifier,
long renewDate) throws YarnException, IOException {
LOG.info("storing RMDelegation token with sequence number: {}.",
identifier.getSequenceNumber());
RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate);
RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
stateStore.storeNewToken(request);
}
/**
* The Router Supports Update RMDelegationTokenIdentifier{@link RMDelegationTokenIdentifier}.
*
* @param identifier delegation tokens from the RM
* @param renewDate renewDate
* @throws YarnException if the call to the state store is unsuccessful
* @throws IOException An IO Error occurred
*/
public void updateStoredToken(RMDelegationTokenIdentifier identifier,
long renewDate) throws YarnException, IOException {
LOG.info("updating RMDelegation token with sequence number: {}.",
identifier.getSequenceNumber());
RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate);
RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
stateStore.updateStoredToken(request);
}
/**
* The Router Supports Remove RMDelegationTokenIdentifier{@link RMDelegationTokenIdentifier}.
*
* @param identifier delegation tokens from the RM
* @throws YarnException if the call to the state store is unsuccessful
* @throws IOException An IO Error occurred
*/
public void removeStoredToken(RMDelegationTokenIdentifier identifier)
throws YarnException, IOException{
LOG.info("removing RMDelegation token with sequence number: {}.",
identifier.getSequenceNumber());
RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, 0L);
RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
stateStore.removeStoredToken(request);
}
/**
* The Router Supports GetTokenByRouterStoreToken{@link RMDelegationTokenIdentifier}.
*
* @param identifier delegation tokens from the RM
* @return RouterStoreToken
* @throws YarnException if the call to the state store is unsuccessful
* @throws IOException An IO Error occurred
*/
public RouterRMTokenResponse getTokenByRouterStoreToken(RMDelegationTokenIdentifier identifier)
throws YarnException, IOException {
LOG.info("get RouterStoreToken token with sequence number: {}.",
identifier.getSequenceNumber());
RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, 0L);
RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
return stateStore.getTokenByRouterStoreToken(request);
}
}

View File

@ -26,12 +26,14 @@ import java.util.HashSet;
import java.util.TimeZone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
@ -74,6 +76,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationH
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.junit.After;
import org.junit.Assert;
@ -922,4 +927,107 @@ public abstract class FederationStateStoreBaseTest {
Assert.assertEquals(routerMasterKey.getKeyBytes(), routerMasterKeyResp.getKeyBytes());
Assert.assertEquals(routerMasterKey.getExpiryDate(), routerMasterKeyResp.getExpiryDate());
}
@Test
public void testStoreNewToken() throws IOException, YarnException {
// prepare parameters
RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier(
new Text("owner1"), new Text("renewer1"), new Text("realuser1"));
int sequenceNumber = 1;
identifier.setSequenceNumber(sequenceNumber);
Long renewDate = Time.now();
// store new rm-token
RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate);
RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request);
// Verify the returned result to ensure that the returned Response is not empty
// and the returned result is consistent with the input parameters.
Assert.assertNotNull(routerRMTokenResponse);
RouterStoreToken storeTokenResp = routerRMTokenResponse.getRouterStoreToken();
Assert.assertNotNull(storeTokenResp);
Assert.assertEquals(storeToken.getRenewDate(), storeTokenResp.getRenewDate());
Assert.assertEquals(storeToken.getTokenIdentifier(), storeTokenResp.getTokenIdentifier());
}
@Test
public void testUpdateStoredToken() throws IOException, YarnException {
// prepare saveToken parameters
RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier(
new Text("owner2"), new Text("renewer2"), new Text("realuser2"));
int sequenceNumber = 2;
identifier.setSequenceNumber(sequenceNumber);
Long renewDate = Time.now();
// store new rm-token
RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate);
RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request);
Assert.assertNotNull(routerRMTokenResponse);
// prepare updateToken parameters
Long renewDate2 = Time.now();
int sequenceNumber2 = 3;
identifier.setSequenceNumber(sequenceNumber2);
// update rm-token
RouterStoreToken updateToken = RouterStoreToken.newInstance(identifier, renewDate2);
RouterRMTokenRequest updateTokenRequest = RouterRMTokenRequest.newInstance(updateToken);
RouterRMTokenResponse updateTokenResponse = stateStore.updateStoredToken(updateTokenRequest);
Assert.assertNotNull(updateTokenResponse);
RouterStoreToken updateTokenResp = updateTokenResponse.getRouterStoreToken();
Assert.assertNotNull(updateTokenResp);
Assert.assertEquals(updateToken.getRenewDate(), updateTokenResp.getRenewDate());
Assert.assertEquals(updateToken.getTokenIdentifier(), updateTokenResp.getTokenIdentifier());
}
@Test
public void testRemoveStoredToken() throws IOException, YarnException {
// prepare saveToken parameters
RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier(
new Text("owner3"), new Text("renewer3"), new Text("realuser3"));
int sequenceNumber = 3;
identifier.setSequenceNumber(sequenceNumber);
Long renewDate = Time.now();
// store new rm-token
RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate);
RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request);
Assert.assertNotNull(routerRMTokenResponse);
// remove rm-token
RouterRMTokenResponse removeTokenResponse = stateStore.removeStoredToken(request);
Assert.assertNotNull(removeTokenResponse);
RouterStoreToken removeTokenResp = removeTokenResponse.getRouterStoreToken();
Assert.assertNotNull(removeTokenResp);
Assert.assertEquals(removeTokenResp.getRenewDate(), storeToken.getRenewDate());
Assert.assertEquals(removeTokenResp.getTokenIdentifier(), storeToken.getTokenIdentifier());
}
@Test
public void testGetTokenByRouterStoreToken() throws IOException, YarnException {
// prepare saveToken parameters
RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier(
new Text("owner4"), new Text("renewer4"), new Text("realuser4"));
int sequenceNumber = 4;
identifier.setSequenceNumber(sequenceNumber);
Long renewDate = Time.now();
// store new rm-token
RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate);
RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request);
Assert.assertNotNull(routerRMTokenResponse);
// getTokenByRouterStoreToken
RouterRMTokenResponse getRouterRMTokenResp = stateStore.getTokenByRouterStoreToken(request);
Assert.assertNotNull(getRouterRMTokenResp);
RouterStoreToken getStoreTokenResp = getRouterRMTokenResp.getRouterStoreToken();
Assert.assertNotNull(getStoreTokenResp);
Assert.assertEquals(getStoreTokenResp.getRenewDate(), storeToken.getRenewDate());
Assert.assertEquals(getStoreTokenResp.getTokenIdentifier(), storeToken.getTokenIdentifier());
}
}

View File

@ -572,4 +572,24 @@ public class TestSQLFederationStateStore extends FederationStateStoreBaseTest {
public void testRemoveStoredMasterKey() throws YarnException, IOException {
super.testRemoveStoredMasterKey();
}
@Test(expected = NotImplementedException.class)
public void testStoreNewToken() throws IOException, YarnException {
super.testStoreNewToken();
}
@Test(expected = NotImplementedException.class)
public void testUpdateStoredToken() throws IOException, YarnException {
super.testUpdateStoredToken();
}
@Test(expected = NotImplementedException.class)
public void testRemoveStoredToken() throws IOException, YarnException {
super.testRemoveStoredToken();
}
@Test(expected = NotImplementedException.class)
public void testGetTokenByRouterStoreToken() throws IOException, YarnException {
super.testGetTokenByRouterStoreToken();
}
}

View File

@ -185,4 +185,24 @@ public class TestZookeeperFederationStateStore
public void testRemoveStoredMasterKey() throws YarnException, IOException {
super.testRemoveStoredMasterKey();
}
@Test(expected = NotImplementedException.class)
public void testStoreNewToken() throws IOException, YarnException {
super.testStoreNewToken();
}
@Test(expected = NotImplementedException.class)
public void testUpdateStoredToken() throws IOException, YarnException {
super.testUpdateStoredToken();
}
@Test(expected = NotImplementedException.class)
public void testRemoveStoredToken() throws IOException, YarnException {
super.testRemoveStoredToken();
}
@Test(expected = NotImplementedException.class)
public void testGetTokenByRouterStoreToken() throws IOException, YarnException {
super.testGetTokenByRouterStoreToken();
}
}

View File

@ -48,6 +48,9 @@ import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClu
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterRegisterResponseProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.UpdateApplicationHomeSubClusterRequestProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.UpdateApplicationHomeSubClusterResponseProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterStoreTokenProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterRMTokenRequestProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterRMTokenResponseProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetReservationHomeSubClusterRequestProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterMasterKeyProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterMasterKeyRequestProto;
@ -84,6 +87,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.UpdateAppl
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.RouterMasterKeyPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.RouterMasterKeyRequestPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.RouterMasterKeyResponsePBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.RouterStoreTokenPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.RouterRMTokenRequestPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.RouterRMTokenResponsePBImpl;
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.ApplicationHomeSubClusterPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.GetReservationHomeSubClusterRequestPBImpl;
import org.apache.hadoop.yarn.server.records.Version;
@ -104,6 +111,8 @@ public class TestFederationProtocolRecords extends BasePBImplRecordsTest {
generateByNewInstance(ApplicationHomeSubCluster.class);
generateByNewInstance(SubClusterPolicyConfiguration.class);
generateByNewInstance(RouterMasterKey.class);
generateByNewInstance(YARNDelegationTokenIdentifier.class);
generateByNewInstance(RouterStoreToken.class);
generateByNewInstance(ReservationId.class);
}
@ -291,6 +300,21 @@ public class TestFederationProtocolRecords extends BasePBImplRecordsTest {
validatePBImplRecord(RouterMasterKeyResponsePBImpl.class, RouterMasterKeyResponseProto.class);
}
@Test
public void testRouterStoreToken() throws Exception {
validatePBImplRecord(RouterStoreTokenPBImpl.class, RouterStoreTokenProto.class);
}
@Test
public void testRouterRMTokenRequest() throws Exception {
validatePBImplRecord(RouterRMTokenRequestPBImpl.class, RouterRMTokenRequestProto.class);
}
@Test
public void testRouterRMTokenResponse() throws Exception {
validatePBImplRecord(RouterRMTokenResponsePBImpl.class, RouterRMTokenResponseProto.class);
}
@Test
public void testApplicationHomeSubCluster() throws Exception {
validatePBImplRecord(ApplicationHomeSubClusterPBImpl.class,
@ -302,4 +326,4 @@ public class TestFederationProtocolRecords extends BasePBImplRecordsTest {
validatePBImplRecord(GetReservationHomeSubClusterRequestPBImpl.class,
GetReservationHomeSubClusterRequestProto.class);
}
}
}

View File

@ -26,10 +26,15 @@ import java.util.Set;
import java.util.HashSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
@ -37,6 +42,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretManagerState;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -269,4 +277,97 @@ public class TestFederationStateStoreFacade {
federationStateStore.getRouterRMSecretManagerState();
Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
}
@Test
public void testStoreNewToken() throws YarnException, IOException {
// store new rm-token
RMDelegationTokenIdentifier dtId1 = new RMDelegationTokenIdentifier(
new Text("owner1"), new Text("renewer1"), new Text("realuser1"));
int sequenceNumber = 1;
dtId1.setSequenceNumber(sequenceNumber);
Long renewDate1 = Time.now();
facade.storeNewToken(dtId1, renewDate1);
// get RouterStoreToken from StateStore
RouterStoreToken routerStoreToken = RouterStoreToken.newInstance(dtId1, renewDate1);
RouterRMTokenRequest rmTokenRequest = RouterRMTokenRequest.newInstance(routerStoreToken);
RouterRMTokenResponse rmTokenResponse = stateStore.getTokenByRouterStoreToken(rmTokenRequest);
Assert.assertNotNull(rmTokenResponse);
RouterStoreToken resultStoreToken = rmTokenResponse.getRouterStoreToken();
YARNDelegationTokenIdentifier resultTokenIdentifier = resultStoreToken.getTokenIdentifier();
Assert.assertNotNull(resultStoreToken);
Assert.assertNotNull(resultTokenIdentifier);
Assert.assertNotNull(resultStoreToken.getRenewDate());
Assert.assertEquals(dtId1, resultTokenIdentifier);
Assert.assertEquals(renewDate1, resultStoreToken.getRenewDate());
Assert.assertEquals(sequenceNumber, resultTokenIdentifier.getSequenceNumber());
}
@Test
public void testUpdateNewToken() throws YarnException, IOException {
// store new rm-token
RMDelegationTokenIdentifier dtId1 = new RMDelegationTokenIdentifier(
new Text("owner2"), new Text("renewer2"), new Text("realuser2"));
int sequenceNumber = 2;
dtId1.setSequenceNumber(sequenceNumber);
Long renewDate1 = Time.now();
facade.storeNewToken(dtId1, renewDate1);
Long renewDate2 = Time.now();
int sequenceNumber2 = 3;
dtId1.setSequenceNumber(sequenceNumber2);
facade.updateStoredToken(dtId1, renewDate2);
// get RouterStoreToken from StateStore
RouterStoreToken routerStoreToken = RouterStoreToken.newInstance(dtId1, renewDate1);
RouterRMTokenRequest rmTokenRequest = RouterRMTokenRequest.newInstance(routerStoreToken);
RouterRMTokenResponse rmTokenResponse = stateStore.getTokenByRouterStoreToken(rmTokenRequest);
Assert.assertNotNull(rmTokenResponse);
RouterStoreToken resultStoreToken = rmTokenResponse.getRouterStoreToken();
YARNDelegationTokenIdentifier resultTokenIdentifier = resultStoreToken.getTokenIdentifier();
Assert.assertNotNull(resultStoreToken);
Assert.assertNotNull(resultTokenIdentifier);
Assert.assertNotNull(resultStoreToken.getRenewDate());
Assert.assertEquals(dtId1, resultTokenIdentifier);
Assert.assertEquals(renewDate2, resultStoreToken.getRenewDate());
Assert.assertEquals(sequenceNumber2, resultTokenIdentifier.getSequenceNumber());
}
@Test
public void testRemoveStoredToken() throws Exception {
// store new rm-token
RMDelegationTokenIdentifier dtId1 = new RMDelegationTokenIdentifier(
new Text("owner3"), new Text("renewer3"), new Text("realuser3"));
int sequenceNumber = 3;
dtId1.setSequenceNumber(sequenceNumber);
Long renewDate1 = Time.now();
facade.storeNewToken(dtId1, renewDate1);
// get RouterStoreToken from StateStore
RouterStoreToken routerStoreToken = RouterStoreToken.newInstance(dtId1, renewDate1);
RouterRMTokenRequest rmTokenRequest = RouterRMTokenRequest.newInstance(routerStoreToken);
RouterRMTokenResponse rmTokenResponse = stateStore.getTokenByRouterStoreToken(rmTokenRequest);
Assert.assertNotNull(rmTokenResponse);
RouterStoreToken resultStoreToken = rmTokenResponse.getRouterStoreToken();
YARNDelegationTokenIdentifier resultTokenIdentifier = resultStoreToken.getTokenIdentifier();
Assert.assertNotNull(resultStoreToken);
Assert.assertNotNull(resultTokenIdentifier);
Assert.assertNotNull(resultStoreToken.getRenewDate());
Assert.assertEquals(dtId1, resultTokenIdentifier);
Assert.assertEquals(renewDate1, resultStoreToken.getRenewDate());
Assert.assertEquals(sequenceNumber, resultTokenIdentifier.getSequenceNumber());
// remove rm-token
facade.removeStoredToken(dtId1);
// Call again(getTokenByRouterStoreToken) after remove will throw IOException(not exist)
LambdaTestUtils.intercept(IOException.class, "RMDelegationToken: " + dtId1 + " does not exist.",
() -> stateStore.getTokenByRouterStoreToken(rmTokenRequest));
}
}

View File

@ -26,7 +26,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.List;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.net.NetUtils;
@ -79,6 +78,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationH
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.records.Version;
@ -383,19 +384,43 @@ public class FederationStateStoreService extends AbstractService
@Override
public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
return stateStoreClient.storeNewMasterKey(request);
}
@Override
public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
return stateStoreClient.removeStoredMasterKey(request);
}
@Override
public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
return stateStoreClient.getMasterKeyByDelegationKey(request);
}
@Override
public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
throws YarnException, IOException {
return stateStoreClient.storeNewToken(request);
}
@Override
public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException {
return stateStoreClient.updateStoredToken(request);
}
@Override
public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException {
return stateStoreClient.removeStoredToken(request);
}
@Override
public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request)
throws YarnException, IOException {
return stateStoreClient.getTokenByRouterStoreToken(request);
}
/**