From dd8fc7e499200476f44a3d1d38127f5c47c9eefb Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 4 Jan 2013 20:15:32 +0000 Subject: [PATCH] YARN-50. Implement renewal / cancellation of Delegation Tokens(Siddharth Seth via tgraves) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1429085 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/api/ClientRMProtocol.java | 38 ++- .../CancelDelegationTokenRequest.java | 34 ++ .../CancelDelegationTokenResponse.java | 31 ++ .../RenewDelegationTokenRequest.java | 34 ++ .../RenewDelegationTokenResponse.java | 32 ++ .../CancelDelegationTokenRequestPBImpl.java | 106 ++++++ .../CancelDelegationTokenResponsePBImpl.java | 44 +++ .../pb/RenewDelegationTokenRequestPBImpl.java | 107 ++++++ .../RenewDelegationTokenResponsePBImpl.java | 69 ++++ .../src/main/proto/client_RM_protocol.proto | 2 + .../src/main/proto/yarn_service_protos.proto | 16 + .../hadoop/yarn/client/YarnClientImpl.java | 51 ++- .../security/RMDelegationTokenRenewer.java | 83 +++++ ....apache.hadoop.security.token.TokenRenewer | 1 + .../client/ClientRMProtocolPBClientImpl.java | 37 ++ .../ClientRMProtocolPBServiceImpl.java | 38 +++ .../resourcemanager/ClientRMService.java | 100 +++++- .../resourcemanager/ResourceManager.java | 4 +- .../resourcemanager/TestClientRMTokens.java | 320 ++++++++++++++++++ 20 files changed, 1118 insertions(+), 32 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenRequest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenRequest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenRequestPBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenRequestPBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenRenewer.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3fb85e48b26..ae97922dced 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -270,6 +270,9 @@ Release 0.23.6 - UNRELEASED YARN-293. Node Manager leaks LocalizerRunner object for every Container (Robert Joseph Evans via jlowe) + YARN-50. Implement renewal / cancellation of Delegation Tokens + (Siddharth Seth via tgraves) + Release 0.23.5 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java index c20b24eab47..8a19dad2566 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java @@ -18,13 +18,11 @@ package org.apache.hadoop.yarn.api; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; - -import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; -import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -33,19 +31,25 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.DelegationToken; +import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; @@ -265,4 +269,26 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls( public GetDelegationTokenResponse getDelegationToken( GetDelegationTokenRequest request) throws YarnRemoteException; + + /** + * Renew an existing delegation token. + * + * @param request the delegation token to be renewed. + * @return the new expiry time for the delegation token. + * @throws YarnRemoteException + */ + @Private + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnRemoteException; + + /** + * Cancel an existing delegation token. + * + * @param request the delegation token to be cancelled. + * @return an empty response. + * @throws YarnRemoteException + */ + @Private + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnRemoteException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenRequest.java new file mode 100644 index 00000000000..d9832ba73ba --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenRequest.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.api.records.DelegationToken; + +/** + * The request issued by the client to the {@code ResourceManager} to cancel a + * delegation token. + */ +@Public +@Evolving +public interface CancelDelegationTokenRequest { + DelegationToken getDelegationToken(); + void setDelegationToken(DelegationToken dToken); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java new file mode 100644 index 00000000000..c934291d72c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/CancelDelegationTokenResponse.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * The response from the {@code ResourceManager} to a cancelDelegationToken + * request. + */ +@Public +@Evolving +public interface CancelDelegationTokenResponse { +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenRequest.java new file mode 100644 index 00000000000..b89f18d9317 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenRequest.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.api.records.DelegationToken; + +/** + * The request issued by the client to renew a delegation token from + * the {@code ResourceManager}. + */ +@Public +@Evolving +public interface RenewDelegationTokenRequest { + DelegationToken getDelegationToken(); + void setDelegationToken(DelegationToken dToken); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java new file mode 100644 index 00000000000..91158a52003 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RenewDelegationTokenResponse.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * The response to a renewDelegationToken call to the {@code ResourceManager}. + */ +@Public +@Evolving +public interface RenewDelegationTokenResponse { + long getNextExpirationTime(); + void setNextExpirationTime(long expTime); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenRequestPBImpl.java new file mode 100644 index 00000000000..cd88c5a79c9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenRequestPBImpl.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.records.DelegationToken; +import org.apache.hadoop.yarn.api.records.ProtoBase; +import org.apache.hadoop.yarn.api.records.impl.pb.DelegationTokenPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.DelegationTokenProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenRequestProtoOrBuilder; + +public class CancelDelegationTokenRequestPBImpl extends + ProtoBase implements + CancelDelegationTokenRequest { + + CancelDelegationTokenRequestProto proto = CancelDelegationTokenRequestProto + .getDefaultInstance(); + CancelDelegationTokenRequestProto.Builder builder = null; + boolean viaProto = false; + + public CancelDelegationTokenRequestPBImpl() { + builder = CancelDelegationTokenRequestProto.newBuilder(); + } + + public CancelDelegationTokenRequestPBImpl( + CancelDelegationTokenRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + DelegationToken token; + + @Override + public DelegationToken getDelegationToken() { + CancelDelegationTokenRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.token != null) { + return this.token; + } + if (!p.hasDelegationToken()) { + return null; + } + this.token = convertFromProtoFormat(p.getDelegationToken()); + return this.token; + } + + @Override + public void setDelegationToken(DelegationToken token) { + maybeInitBuilder(); + if (token == null) + builder.clearDelegationToken(); + this.token = token; + } + + @Override + public CancelDelegationTokenRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToBuilder() { + if (token != null) { + builder.setDelegationToken(convertToProtoFormat(this.token)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = CancelDelegationTokenRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private DelegationTokenPBImpl convertFromProtoFormat(DelegationTokenProto p) { + return new DelegationTokenPBImpl(p); + } + + private DelegationTokenProto convertToProtoFormat(DelegationToken t) { + return ((DelegationTokenPBImpl) t).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java new file mode 100644 index 00000000000..14908c7bbcc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/CancelDelegationTokenResponsePBImpl.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.records.ProtoBase; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenResponseProto; + +public class CancelDelegationTokenResponsePBImpl extends + ProtoBase implements + CancelDelegationTokenResponse { + + CancelDelegationTokenResponseProto proto = CancelDelegationTokenResponseProto + .getDefaultInstance(); + + public CancelDelegationTokenResponsePBImpl() { + } + + public CancelDelegationTokenResponsePBImpl( + CancelDelegationTokenResponseProto proto) { + this.proto = proto; + } + + @Override + public CancelDelegationTokenResponseProto getProto() { + return proto; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenRequestPBImpl.java new file mode 100644 index 00000000000..27fc08c6147 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenRequestPBImpl.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.records.DelegationToken; +import org.apache.hadoop.yarn.api.records.ProtoBase; +import org.apache.hadoop.yarn.api.records.impl.pb.DelegationTokenPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.DelegationTokenProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenRequestProtoOrBuilder; + +public class RenewDelegationTokenRequestPBImpl extends + ProtoBase implements + RenewDelegationTokenRequest { + RenewDelegationTokenRequestProto proto = + RenewDelegationTokenRequestProto.getDefaultInstance(); + RenewDelegationTokenRequestProto.Builder builder = null; + boolean viaProto = false; + + public RenewDelegationTokenRequestPBImpl() { + builder = RenewDelegationTokenRequestProto.newBuilder(); + } + + public RenewDelegationTokenRequestPBImpl ( + RenewDelegationTokenRequestProto proto) { + this.proto = proto; + this.viaProto = true; + } + + DelegationToken token; + + @Override + public DelegationToken getDelegationToken() { + RenewDelegationTokenRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.token != null) { + return this.token; + } + if (!p.hasDelegationToken()) { + return null; + } + this.token = convertFromProtoFormat(p.getDelegationToken()); + return this.token; + } + + @Override + public void setDelegationToken(DelegationToken token) { + maybeInitBuilder(); + if (token == null) + builder.clearDelegationToken(); + this.token = token; + } + + @Override + public RenewDelegationTokenRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + + private void mergeLocalToBuilder() { + if (token != null) { + builder.setDelegationToken(convertToProtoFormat(this.token)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = RenewDelegationTokenRequestProto.newBuilder(proto); + } + viaProto = false; + } + + + private DelegationTokenPBImpl convertFromProtoFormat(DelegationTokenProto p) { + return new DelegationTokenPBImpl(p); + } + + private DelegationTokenProto convertToProtoFormat(DelegationToken t) { + return ((DelegationTokenPBImpl)t).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java new file mode 100644 index 00000000000..b5c80f199c9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RenewDelegationTokenResponsePBImpl.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.records.ProtoBase; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenResponseProtoOrBuilder; + +public class RenewDelegationTokenResponsePBImpl extends + ProtoBase implements + RenewDelegationTokenResponse { + + RenewDelegationTokenResponseProto proto = + RenewDelegationTokenResponseProto.getDefaultInstance(); + RenewDelegationTokenResponseProto.Builder builder = null; + boolean viaProto = false; + + public RenewDelegationTokenResponsePBImpl() { + this.builder = RenewDelegationTokenResponseProto.newBuilder(); + } + + public RenewDelegationTokenResponsePBImpl ( + RenewDelegationTokenResponseProto proto) { + this.proto = proto; + this.viaProto = true; + } + + @Override + public RenewDelegationTokenResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = RenewDelegationTokenResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public long getNextExpirationTime() { + RenewDelegationTokenResponseProtoOrBuilder p = viaProto ? proto : builder; + return p.getNextExpiryTs(); + } + + @Override + public void setNextExpirationTime(long expTime) { + maybeInitBuilder(); + builder.setNextExpiryTs(expTime); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto index edc6de69489..7495ce8784f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto @@ -34,5 +34,7 @@ service ClientRMProtocolService { rpc getQueueInfo (GetQueueInfoRequestProto) returns (GetQueueInfoResponseProto); rpc getQueueUserAcls (GetQueueUserAclsInfoRequestProto) returns (GetQueueUserAclsInfoResponseProto); rpc getDelegationToken(GetDelegationTokenRequestProto) returns (GetDelegationTokenResponseProto); + rpc renewDelegationToken(RenewDelegationTokenRequestProto) returns (RenewDelegationTokenResponseProto); + rpc cancelDelegationToken(CancelDelegationTokenRequestProto) returns (CancelDelegationTokenResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index a97c4e4ffd3..47bb124332a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -147,6 +147,22 @@ message GetDelegationTokenResponseProto { optional DelegationTokenProto application_token = 1; } +message RenewDelegationTokenRequestProto { + required DelegationTokenProto delegation_token = 1; +} + +message RenewDelegationTokenResponseProto { + required int64 next_expiry_ts = 1; +} + +message CancelDelegationTokenRequestProto { + required DelegationTokenProto delegation_token = 1; +} + +message CancelDelegationTokenResponseProto { +} + + ////////////////////////////////////////////////////// /////// client_NM_Protocol /////////////////////////// ////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java index e8815c62060..14994f97a7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java @@ -25,11 +25,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -45,6 +47,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -72,7 +76,12 @@ public class YarnClientImpl extends AbstractService implements YarnClient { private static final String ROOT = "root"; public YarnClientImpl() { + this(null); + } + + public YarnClientImpl(InetSocketAddress rmAddress) { super(YarnClientImpl.class.getName()); + this.rmAddress = rmAddress; } private static InetSocketAddress getRmAddress(Configuration conf) { @@ -82,7 +91,9 @@ private static InetSocketAddress getRmAddress(Configuration conf) { @Override public synchronized void init(Configuration conf) { - this.rmAddress = getRmAddress(conf); + if (this.rmAddress == null) { + this.rmAddress = getRmAddress(conf); + } super.init(conf); } @@ -90,16 +101,19 @@ public synchronized void init(Configuration conf) { public synchronized void start() { YarnRPC rpc = YarnRPC.create(getConfig()); - this.rmClient = - (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, - getConfig()); - LOG.debug("Connecting to ResourceManager at " + rmAddress); + this.rmClient = (ClientRMProtocol) rpc.getProxy( + ClientRMProtocol.class, rmAddress, getConfig()); + if (LOG.isDebugEnabled()) { + LOG.debug("Connecting to ResourceManager at " + rmAddress); + } super.start(); } @Override public synchronized void stop() { - RPC.stopProxy(this.rmClient); + if (this.rmClient != null) { + RPC.stopProxy(this.rmClient); + } super.stop(); } @@ -184,6 +198,31 @@ public DelegationToken getRMDelegationToken(Text renewer) return response.getRMDelegationToken(); } + + // Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel + // are part of ClientRMProtocol. + @Private + public long renewRMDelegationToken(DelegationToken rmToken) + throws YarnRemoteException { + RenewDelegationTokenRequest request = Records + .newRecord(RenewDelegationTokenRequest.class); + request.setDelegationToken(rmToken); + RenewDelegationTokenResponse response = rmClient + .renewDelegationToken(request); + return response.getNextExpirationTime(); + } + + // Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel + // are part of ClietnRMProtocol + @Private + public void cancelRMDelegationToken(DelegationToken rmToken) + throws YarnRemoteException { + CancelDelegationTokenRequest request = Records + .newRecord(CancelDelegationTokenRequest.class); + request.setDelegationToken(rmToken); + rmClient.cancelDelegationToken(request); + } + private GetQueueInfoRequest getQueueInfoRequest(String queueName, boolean includeApplications, boolean includeChildQueues, boolean recursive) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenRenewer.java new file mode 100644 index 00000000000..3f1caeec183 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenRenewer.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.security; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenRenewer; +import org.apache.hadoop.yarn.api.records.DelegationToken; +import org.apache.hadoop.yarn.client.YarnClientImpl; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.util.BuilderUtils; + +public class RMDelegationTokenRenewer extends TokenRenewer { + + @Override + public boolean handleKind(Text kind) { + return RMDelegationTokenIdentifier.KIND_NAME.equals(kind); + } + + @Override + public boolean isManaged(Token token) throws IOException { + return true; + } + + @Override + public long renew(Token token, Configuration conf) throws IOException, + InterruptedException { + YarnClientImpl yarnClient = getYarnClient(conf, + SecurityUtil.getTokenServiceAddr(token)); + try { + DelegationToken dToken = BuilderUtils.newDelegationToken( + token.getIdentifier(), token.getKind().toString(), + token.getPassword(), token.getService().toString()); + return yarnClient.renewRMDelegationToken(dToken); + } finally { + yarnClient.stop(); + } + } + + @Override + public void cancel(Token token, Configuration conf) throws IOException, + InterruptedException { + YarnClientImpl yarnClient = getYarnClient(conf, + SecurityUtil.getTokenServiceAddr(token)); + try { + DelegationToken dToken = BuilderUtils.newDelegationToken( + token.getIdentifier(), token.getKind().toString(), + token.getPassword(), token.getService().toString()); + yarnClient.cancelRMDelegationToken(dToken); + return; + } finally { + yarnClient.stop(); + } + } + + private YarnClientImpl getYarnClient(Configuration conf, + InetSocketAddress rmAddress) { + YarnClientImpl yarnClient = new YarnClientImpl(rmAddress); + yarnClient.init(conf); + yarnClient.start(); + return yarnClient; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer new file mode 100644 index 00000000000..6625a9cfe53 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer @@ -0,0 +1 @@ +org.apache.hadoop.yarn.security.RMDelegationTokenRenewer; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java index 0f2bf7a34fb..6302725250e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java @@ -27,6 +27,8 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -45,8 +47,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl; @@ -65,10 +71,13 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsRequestProto; @@ -78,6 +87,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import com.google.protobuf.ServiceException; @@ -233,4 +243,31 @@ public GetDelegationTokenResponse getDelegationToken( throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); } } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnRemoteException { + RenewDelegationTokenRequestProto requestProto = + ((RenewDelegationTokenRequestPBImpl) request).getProto(); + try { + return new RenewDelegationTokenResponsePBImpl(proxy.renewDelegationToken( + null, requestProto)); + } catch (ServiceException e) { + throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); + } + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnRemoteException { + CancelDelegationTokenRequestProto requestProto = + ((CancelDelegationTokenRequestPBImpl) request).getProto(); + try { + return new CancelDelegationTokenResponsePBImpl( + proxy.cancelDelegationToken(null, requestProto)); + + } catch (ServiceException e) { + throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java index 2f0e89c5c2f..9eecdad68a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java @@ -20,6 +20,7 @@ import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; @@ -29,7 +30,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl; @@ -48,9 +52,13 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto; @@ -69,6 +77,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto; @@ -212,4 +222,32 @@ public GetDelegationTokenResponseProto getDelegationToken( throw new ServiceException(e); } } + + @Override + public RenewDelegationTokenResponseProto renewDelegationToken( + RpcController controller, RenewDelegationTokenRequestProto proto) + throws ServiceException { + RenewDelegationTokenRequestPBImpl request = + new RenewDelegationTokenRequestPBImpl(proto); + try { + RenewDelegationTokenResponse response = real.renewDelegationToken(request); + return ((RenewDelegationTokenResponsePBImpl)response).getProto(); + } catch (YarnRemoteException e) { + throw new ServiceException(e); + } + } + + @Override + public CancelDelegationTokenResponseProto cancelDelegationToken( + RpcController controller, CancelDelegationTokenRequestProto proto) + throws ServiceException { + CancelDelegationTokenRequestPBImpl request = + new CancelDelegationTokenRequestPBImpl(proto); + try { + CancelDelegationTokenResponse response = real.cancelDelegationToken(request); + return ((CancelDelegationTokenResponsePBImpl)response).getProto(); + } catch (YarnRemoteException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index e8bd5d03114..c111e5c57e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -23,6 +23,7 @@ import java.security.AccessControlException; import java.util.ArrayList; import java.util.Collection; +import java.util.EnumSet; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -39,6 +40,8 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -57,12 +60,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; @@ -87,6 +93,7 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; /** @@ -105,7 +112,7 @@ public class ClientRMService extends AbstractService implements private final RMAppManager rmAppManager; private Server server; - private RMDelegationTokenSecretManager rmDTSecretManager; + protected RMDelegationTokenSecretManager rmDTSecretManager; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); InetSocketAddress clientBindAddress; @@ -122,16 +129,13 @@ public ClientRMService(RMContext rmContext, YarnScheduler scheduler, this.applicationsACLsManager = applicationACLsManager; this.rmDTSecretManager = rmDTSecretManager; } - + @Override public void init(Configuration conf) { - clientBindAddress = conf.getSocketAddr( - YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_PORT); + clientBindAddress = getBindAddress(conf); super.init(conf); } - + @Override public void start() { Configuration conf = getConfig(); @@ -156,6 +160,20 @@ public void start() { super.start(); } + @Override + public void stop() { + if (this.server != null) { + this.server.stop(); + } + super.stop(); + } + + InetSocketAddress getBindAddress(Configuration conf) { + return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); + } + @Private public InetSocketAddress getBindAddress() { return clientBindAddress; @@ -455,10 +473,7 @@ public GetDelegationTokenResponse getDelegationToken( try { // Verify that the connection is kerberos authenticated - AuthenticationMethod authMethod = UserGroupInformation - .getRealAuthenticationMethod(UserGroupInformation.getCurrentUser()); - if (UserGroupInformation.isSecurityEnabled() - && (authMethod != AuthenticationMethod.KERBEROS)) { + if (!isAllowedDelegationTokenOp()) { throw new IOException( "Delegation Token can be issued only with kerberos authentication"); } @@ -490,17 +505,66 @@ public GetDelegationTokenResponse getDelegationToken( } } + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnRemoteException { + try { + if (!isAllowedDelegationTokenOp()) { + throw new IOException( + "Delegation Token can be renewed only with kerberos authentication"); + } + + DelegationToken protoToken = request.getDelegationToken(); + Token token = new Token( + protoToken.getIdentifier().array(), protoToken.getPassword().array(), + new Text(protoToken.getKind()), new Text(protoToken.getService())); + + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + long nextExpTime = rmDTSecretManager.renewToken(token, user); + RenewDelegationTokenResponse renewResponse = Records + .newRecord(RenewDelegationTokenResponse.class); + renewResponse.setNextExpirationTime(nextExpTime); + return renewResponse; + } catch (IOException e) { + throw RPCUtil.getRemoteException(e); + } + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnRemoteException { + try { + if (!isAllowedDelegationTokenOp()) { + throw new IOException( + "Delegation Token can be cancelled only with kerberos authentication"); + } + DelegationToken protoToken = request.getDelegationToken(); + Token token = new Token( + protoToken.getIdentifier().array(), protoToken.getPassword().array(), + new Text(protoToken.getKind()), new Text(protoToken.getService())); + + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + rmDTSecretManager.cancelToken(token, user); + return Records.newRecord(CancelDelegationTokenResponse.class); + } catch (IOException e) { + throw RPCUtil.getRemoteException(e); + } + } + void refreshServiceAcls(Configuration configuration, PolicyProvider policyProvider) { this.server.refreshServiceAcl(configuration, policyProvider); } - - @Override - public void stop() { - if (this.server != null) { - this.server.stop(); + + private boolean isAllowedDelegationTokenOp() throws IOException { + if (UserGroupInformation.isSecurityEnabled()) { + return EnumSet.of(AuthenticationMethod.KERBEROS, + AuthenticationMethod.KERBEROS_SSL, + AuthenticationMethod.CERTIFICATE) + .contains(UserGroupInformation.getCurrentUser() + .getRealAuthenticationMethod()); + } else { + return true; } - super.stop(); } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index e196770837e..aa96944fc6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -47,10 +47,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -66,8 +66,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java new file mode 100644 index 00000000000..e2a961d50c5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetSocketAddress; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.records.DelegationToken; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.ProtoUtils; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Test; + + +public class TestClientRMTokens { + + private static final Log LOG = LogFactory.getLog(TestClientRMTokens.class); + + + @Test + public void testDelegationToken() throws IOException, InterruptedException { + + final YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_PRINCIPAL, "testuser/localhost@apache.org"); + + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(conf); + + ResourceScheduler scheduler = createMockScheduler(conf); + + long initialInterval = 10000l; + long maxLifetime= 20000l; + long renewInterval = 10000l; + + RMDelegationTokenSecretManager rmDtSecretManager = createRMDelegationTokenSecretManager( + initialInterval, maxLifetime, renewInterval); + rmDtSecretManager.startThreads(); + LOG.info("Creating DelegationTokenSecretManager with initialInterval: " + + initialInterval + ", maxLifetime: " + maxLifetime + + ", renewInterval: " + renewInterval); + + final ClientRMService clientRMService = new ClientRMServiceForTest(conf, + scheduler, rmDtSecretManager); + clientRMService.init(conf); + clientRMService.start(); + + ClientRMProtocol clientRMWithDT = null; + try { + + // Create a user for the renewr and fake the authentication-method + UserGroupInformation loggedInUser = UserGroupInformation + .createRemoteUser("testrenewer@APACHE.ORG"); + Assert.assertEquals("testrenewer", loggedInUser.getShortUserName()); + // Default realm is APACHE.ORG + loggedInUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS); + + + DelegationToken token = getDelegationToken(loggedInUser, clientRMService, + loggedInUser.getShortUserName()); + long tokenFetchTime = System.currentTimeMillis(); + LOG.info("Got delegation token at: " + tokenFetchTime); + + // Now try talking to RMService using the delegation token + clientRMWithDT = getClientRMProtocolWithDT(token, + clientRMService.getBindAddress(), "loginuser1", conf); + + GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class); + + try { + clientRMWithDT.getNewApplication(request); + } catch (UndeclaredThrowableException e) { + fail("Unexpected exception" + e); + } + + // Renew after 50% of token age. + while(System.currentTimeMillis() < tokenFetchTime + initialInterval / 2) { + Thread.sleep(500l); + } + long nextExpTime = renewDelegationToken(loggedInUser, clientRMService, token); + long renewalTime = System.currentTimeMillis(); + LOG.info("Renewed token at: " + renewalTime + ", NextExpiryTime: " + + nextExpTime); + + // Wait for first expiry, but before renewed expiry. + while (System.currentTimeMillis() > tokenFetchTime + initialInterval + && System.currentTimeMillis() < nextExpTime) { + Thread.sleep(500l); + } + Thread.sleep(50l); + + // Valid token because of renewal. + try { + clientRMWithDT.getNewApplication(request); + } catch (UndeclaredThrowableException e) { + fail("Unexpected exception" + e); + } + + // Wait for expiry. + while(System.currentTimeMillis() < renewalTime + renewInterval) { + Thread.sleep(500l); + } + Thread.sleep(50l); + LOG.info("At time: " + System.currentTimeMillis() + ", token should be invalid"); + // Token should have expired. + try { + clientRMWithDT.getNewApplication(request); + fail("Should not have succeeded with an expired token"); + } catch (UndeclaredThrowableException e) { + assertTrue(e.getCause().getMessage().contains("is expired")); + } + + // Test cancellation + // Stop the existing proxy, start another. + if (clientRMWithDT != null) { + RPC.stopProxy(clientRMWithDT); + clientRMWithDT = null; + } + token = getDelegationToken(loggedInUser, clientRMService, + loggedInUser.getShortUserName()); + tokenFetchTime = System.currentTimeMillis(); + LOG.info("Got delegation token at: " + tokenFetchTime); + + // Now try talking to RMService using the delegation token + clientRMWithDT = getClientRMProtocolWithDT(token, + clientRMService.getBindAddress(), "loginuser2", conf); + + request = Records.newRecord(GetNewApplicationRequest.class); + + try { + clientRMWithDT.getNewApplication(request); + } catch (UndeclaredThrowableException e) { + fail("Unexpected exception" + e); + } + cancelDelegationToken(loggedInUser, clientRMService, token); + if (clientRMWithDT != null) { + RPC.stopProxy(clientRMWithDT); + clientRMWithDT = null; + } + + // Creating a new connection. + clientRMWithDT = getClientRMProtocolWithDT(token, + clientRMService.getBindAddress(), "loginuser2", conf); + LOG.info("Cancelled delegation token at: " + System.currentTimeMillis()); + // Verify cancellation worked. + try { + clientRMWithDT.getNewApplication(request); + fail("Should not have succeeded with a cancelled delegation token"); + } catch (UndeclaredThrowableException e) { + } + + + + } finally { + rmDtSecretManager.stopThreads(); + // TODO PRECOMMIT Close proxies. + if (clientRMWithDT != null) { + RPC.stopProxy(clientRMWithDT); + } + } + + } + + // Get the delegation token directly as it is a little difficult to setup + // the kerberos based rpc. + private DelegationToken getDelegationToken( + final UserGroupInformation loggedInUser, + final ClientRMProtocol clientRMService, final String renewerString) + throws IOException, InterruptedException { + DelegationToken token = loggedInUser + .doAs(new PrivilegedExceptionAction() { + @Override + public DelegationToken run() throws YarnRemoteException { + GetDelegationTokenRequest request = Records + .newRecord(GetDelegationTokenRequest.class); + request.setRenewer(renewerString); + return clientRMService.getDelegationToken(request) + .getRMDelegationToken(); + } + }); + return token; + } + + private long renewDelegationToken(final UserGroupInformation loggedInUser, + final ClientRMProtocol clientRMService, final DelegationToken dToken) + throws IOException, InterruptedException { + long nextExpTime = loggedInUser.doAs(new PrivilegedExceptionAction() { + @Override + public Long run() throws YarnRemoteException { + RenewDelegationTokenRequest request = Records + .newRecord(RenewDelegationTokenRequest.class); + request.setDelegationToken(dToken); + return clientRMService.renewDelegationToken(request) + .getNextExpirationTime(); + } + }); + return nextExpTime; + } + + private void cancelDelegationToken(final UserGroupInformation loggedInUser, + final ClientRMProtocol clientRMService, final DelegationToken dToken) + throws IOException, InterruptedException { + loggedInUser.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws YarnRemoteException { + CancelDelegationTokenRequest request = Records + .newRecord(CancelDelegationTokenRequest.class); + request.setDelegationToken(dToken); + clientRMService.cancelDelegationToken(request); + return null; + } + }); + } + + private ClientRMProtocol getClientRMProtocolWithDT(DelegationToken token, + final InetSocketAddress rmAddress, String user, final Configuration conf) { + // Maybe consider converting to Hadoop token, serialize de-serialize etc + // before trying to renew the token. + + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(user); + ugi.addToken(ProtoUtils.convertFromProtoFormat(token, rmAddress)); + + final YarnRPC rpc = YarnRPC.create(conf); + ClientRMProtocol clientRMWithDT = ugi + .doAs(new PrivilegedAction() { + @Override + public ClientRMProtocol run() { + return (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, + rmAddress, conf); + } + }); + return clientRMWithDT; + } + + class ClientRMServiceForTest extends ClientRMService { + + public ClientRMServiceForTest(Configuration conf, + ResourceScheduler scheduler, + RMDelegationTokenSecretManager rmDTSecretManager) { + super(mock(RMContext.class), scheduler, mock(RMAppManager.class), + new ApplicationACLsManager(conf), rmDTSecretManager); + } + + // Use a random port unless explicitly specified. + @Override + InetSocketAddress getBindAddress(Configuration conf) { + return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, 0); + } + + @Override + public void stop() { + if (rmDTSecretManager != null) { + rmDTSecretManager.stopThreads(); + } + super.stop(); + } + + + } + + private static ResourceScheduler createMockScheduler(Configuration conf) { + ResourceScheduler mockSched = mock(ResourceScheduler.class); + doReturn(BuilderUtils.newResource(512)).when(mockSched) + .getMinimumResourceCapability(); + doReturn(BuilderUtils.newResource(5120)).when(mockSched) + .getMaximumResourceCapability(); + return mockSched; + } + + private static RMDelegationTokenSecretManager createRMDelegationTokenSecretManager( + long secretKeyInterval, long tokenMaxLifetime, long tokenRenewInterval) { + RMDelegationTokenSecretManager rmDtSecretManager = new RMDelegationTokenSecretManager( + secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, 3600000); + return rmDtSecretManager; + } +}