YARN-50. Implement renewal / cancellation of Delegation Tokens(Siddharth Seth via tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1429104 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2013-01-04 20:31:12 +00:00
parent ec4fc07c52
commit 5596a8cb11
20 changed files with 1118 additions and 32 deletions

View File

@ -253,6 +253,9 @@ Release 0.23.6 - UNRELEASED
YARN-293. Node Manager leaks LocalizerRunner object for every Container
(Robert Joseph Evans via jlowe)
YARN-50. Implement renewal / cancellation of Delegation Tokens
(Siddharth Seth via tgraves)
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -18,13 +18,11 @@
package org.apache.hadoop.yarn.api;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@ -33,19 +31,25 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@ -265,4 +269,26 @@ public interface ClientRMProtocol {
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request)
throws YarnRemoteException;
/**
* Renew an existing delegation token.
*
* @param request the delegation token to be renewed.
* @return the new expiry time for the delegation token.
* @throws YarnRemoteException
*/
@Private
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnRemoteException;
/**
* Cancel an existing delegation token.
*
* @param request the delegation token to be cancelled.
* @return an empty response.
* @throws YarnRemoteException
*/
@Private
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException;
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.records.DelegationToken;
/**
* The request issued by the client to the {@code ResourceManager} to cancel a
* delegation token.
*/
@Public
@Evolving
public interface CancelDelegationTokenRequest {
DelegationToken getDelegationToken();
void setDelegationToken(DelegationToken dToken);
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
/**
* The response from the {@code ResourceManager} to a cancelDelegationToken
* request.
*/
@Public
@Evolving
public interface CancelDelegationTokenResponse {
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.records.DelegationToken;
/**
* The request issued by the client to renew a delegation token from
* the {@code ResourceManager}.
*/
@Public
@Evolving
public interface RenewDelegationTokenRequest {
DelegationToken getDelegationToken();
void setDelegationToken(DelegationToken dToken);
}

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
/**
* The response to a renewDelegationToken call to the {@code ResourceManager}.
*/
@Public
@Evolving
public interface RenewDelegationTokenResponse {
long getNextExpirationTime();
void setNextExpirationTime(long expTime);
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.DelegationTokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.DelegationTokenProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenRequestProtoOrBuilder;
public class CancelDelegationTokenRequestPBImpl extends
ProtoBase<CancelDelegationTokenRequestProto> implements
CancelDelegationTokenRequest {
CancelDelegationTokenRequestProto proto = CancelDelegationTokenRequestProto
.getDefaultInstance();
CancelDelegationTokenRequestProto.Builder builder = null;
boolean viaProto = false;
public CancelDelegationTokenRequestPBImpl() {
builder = CancelDelegationTokenRequestProto.newBuilder();
}
public CancelDelegationTokenRequestPBImpl(
CancelDelegationTokenRequestProto proto) {
this.proto = proto;
viaProto = true;
}
DelegationToken token;
@Override
public DelegationToken getDelegationToken() {
CancelDelegationTokenRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.token != null) {
return this.token;
}
if (!p.hasDelegationToken()) {
return null;
}
this.token = convertFromProtoFormat(p.getDelegationToken());
return this.token;
}
@Override
public void setDelegationToken(DelegationToken token) {
maybeInitBuilder();
if (token == null)
builder.clearDelegationToken();
this.token = token;
}
@Override
public CancelDelegationTokenRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (token != null) {
builder.setDelegationToken(convertToProtoFormat(this.token));
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = CancelDelegationTokenRequestProto.newBuilder(proto);
}
viaProto = false;
}
private DelegationTokenPBImpl convertFromProtoFormat(DelegationTokenProto p) {
return new DelegationTokenPBImpl(p);
}
private DelegationTokenProto convertToProtoFormat(DelegationToken t) {
return ((DelegationTokenPBImpl) t).getProto();
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenResponseProto;
public class CancelDelegationTokenResponsePBImpl extends
ProtoBase<CancelDelegationTokenResponseProto> implements
CancelDelegationTokenResponse {
CancelDelegationTokenResponseProto proto = CancelDelegationTokenResponseProto
.getDefaultInstance();
public CancelDelegationTokenResponsePBImpl() {
}
public CancelDelegationTokenResponsePBImpl(
CancelDelegationTokenResponseProto proto) {
this.proto = proto;
}
@Override
public CancelDelegationTokenResponseProto getProto() {
return proto;
}
}

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.DelegationTokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.DelegationTokenProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenRequestProtoOrBuilder;
public class RenewDelegationTokenRequestPBImpl extends
ProtoBase<RenewDelegationTokenRequestProto> implements
RenewDelegationTokenRequest {
RenewDelegationTokenRequestProto proto =
RenewDelegationTokenRequestProto.getDefaultInstance();
RenewDelegationTokenRequestProto.Builder builder = null;
boolean viaProto = false;
public RenewDelegationTokenRequestPBImpl() {
builder = RenewDelegationTokenRequestProto.newBuilder();
}
public RenewDelegationTokenRequestPBImpl (
RenewDelegationTokenRequestProto proto) {
this.proto = proto;
this.viaProto = true;
}
DelegationToken token;
@Override
public DelegationToken getDelegationToken() {
RenewDelegationTokenRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.token != null) {
return this.token;
}
if (!p.hasDelegationToken()) {
return null;
}
this.token = convertFromProtoFormat(p.getDelegationToken());
return this.token;
}
@Override
public void setDelegationToken(DelegationToken token) {
maybeInitBuilder();
if (token == null)
builder.clearDelegationToken();
this.token = token;
}
@Override
public RenewDelegationTokenRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (token != null) {
builder.setDelegationToken(convertToProtoFormat(this.token));
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = RenewDelegationTokenRequestProto.newBuilder(proto);
}
viaProto = false;
}
private DelegationTokenPBImpl convertFromProtoFormat(DelegationTokenProto p) {
return new DelegationTokenPBImpl(p);
}
private DelegationTokenProto convertToProtoFormat(DelegationToken t) {
return ((DelegationTokenPBImpl)t).getProto();
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenResponseProtoOrBuilder;
public class RenewDelegationTokenResponsePBImpl extends
ProtoBase<RenewDelegationTokenResponseProto> implements
RenewDelegationTokenResponse {
RenewDelegationTokenResponseProto proto =
RenewDelegationTokenResponseProto.getDefaultInstance();
RenewDelegationTokenResponseProto.Builder builder = null;
boolean viaProto = false;
public RenewDelegationTokenResponsePBImpl() {
this.builder = RenewDelegationTokenResponseProto.newBuilder();
}
public RenewDelegationTokenResponsePBImpl (
RenewDelegationTokenResponseProto proto) {
this.proto = proto;
this.viaProto = true;
}
@Override
public RenewDelegationTokenResponseProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = RenewDelegationTokenResponseProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public long getNextExpirationTime() {
RenewDelegationTokenResponseProtoOrBuilder p = viaProto ? proto : builder;
return p.getNextExpiryTs();
}
@Override
public void setNextExpirationTime(long expTime) {
maybeInitBuilder();
builder.setNextExpiryTs(expTime);
}
}

View File

@ -34,5 +34,7 @@ service ClientRMProtocolService {
rpc getQueueInfo (GetQueueInfoRequestProto) returns (GetQueueInfoResponseProto);
rpc getQueueUserAcls (GetQueueUserAclsInfoRequestProto) returns (GetQueueUserAclsInfoResponseProto);
rpc getDelegationToken(GetDelegationTokenRequestProto) returns (GetDelegationTokenResponseProto);
rpc renewDelegationToken(RenewDelegationTokenRequestProto) returns (RenewDelegationTokenResponseProto);
rpc cancelDelegationToken(CancelDelegationTokenRequestProto) returns (CancelDelegationTokenResponseProto);
}

View File

@ -147,6 +147,22 @@ message GetDelegationTokenResponseProto {
optional DelegationTokenProto application_token = 1;
}
message RenewDelegationTokenRequestProto {
required DelegationTokenProto delegation_token = 1;
}
message RenewDelegationTokenResponseProto {
required int64 next_expiry_ts = 1;
}
message CancelDelegationTokenRequestProto {
required DelegationTokenProto delegation_token = 1;
}
message CancelDelegationTokenResponseProto {
}
//////////////////////////////////////////////////////
/////// client_NM_Protocol ///////////////////////////
//////////////////////////////////////////////////////

View File

@ -25,11 +25,13 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@ -45,6 +47,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -72,7 +76,12 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
private static final String ROOT = "root";
public YarnClientImpl() {
this(null);
}
public YarnClientImpl(InetSocketAddress rmAddress) {
super(YarnClientImpl.class.getName());
this.rmAddress = rmAddress;
}
private static InetSocketAddress getRmAddress(Configuration conf) {
@ -82,7 +91,9 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
@Override
public synchronized void init(Configuration conf) {
this.rmAddress = getRmAddress(conf);
if (this.rmAddress == null) {
this.rmAddress = getRmAddress(conf);
}
super.init(conf);
}
@ -90,16 +101,19 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
public synchronized void start() {
YarnRPC rpc = YarnRPC.create(getConfig());
this.rmClient =
(ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress,
getConfig());
LOG.debug("Connecting to ResourceManager at " + rmAddress);
this.rmClient = (ClientRMProtocol) rpc.getProxy(
ClientRMProtocol.class, rmAddress, getConfig());
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to ResourceManager at " + rmAddress);
}
super.start();
}
@Override
public synchronized void stop() {
RPC.stopProxy(this.rmClient);
if (this.rmClient != null) {
RPC.stopProxy(this.rmClient);
}
super.stop();
}
@ -184,6 +198,31 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
return response.getRMDelegationToken();
}
// Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel
// are part of ClientRMProtocol.
@Private
public long renewRMDelegationToken(DelegationToken rmToken)
throws YarnRemoteException {
RenewDelegationTokenRequest request = Records
.newRecord(RenewDelegationTokenRequest.class);
request.setDelegationToken(rmToken);
RenewDelegationTokenResponse response = rmClient
.renewDelegationToken(request);
return response.getNextExpirationTime();
}
// Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel
// are part of ClietnRMProtocol
@Private
public void cancelRMDelegationToken(DelegationToken rmToken)
throws YarnRemoteException {
CancelDelegationTokenRequest request = Records
.newRecord(CancelDelegationTokenRequest.class);
request.setDelegationToken(rmToken);
rmClient.cancelDelegationToken(request);
}
private GetQueueInfoRequest
getQueueInfoRequest(String queueName, boolean includeApplications,
boolean includeChildQueues, boolean recursive) {

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.security;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.client.YarnClientImpl;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.BuilderUtils;
public class RMDelegationTokenRenewer extends TokenRenewer {
@Override
public boolean handleKind(Text kind) {
return RMDelegationTokenIdentifier.KIND_NAME.equals(kind);
}
@Override
public boolean isManaged(Token<?> token) throws IOException {
return true;
}
@Override
public long renew(Token<?> token, Configuration conf) throws IOException,
InterruptedException {
YarnClientImpl yarnClient = getYarnClient(conf,
SecurityUtil.getTokenServiceAddr(token));
try {
DelegationToken dToken = BuilderUtils.newDelegationToken(
token.getIdentifier(), token.getKind().toString(),
token.getPassword(), token.getService().toString());
return yarnClient.renewRMDelegationToken(dToken);
} finally {
yarnClient.stop();
}
}
@Override
public void cancel(Token<?> token, Configuration conf) throws IOException,
InterruptedException {
YarnClientImpl yarnClient = getYarnClient(conf,
SecurityUtil.getTokenServiceAddr(token));
try {
DelegationToken dToken = BuilderUtils.newDelegationToken(
token.getIdentifier(), token.getKind().toString(),
token.getPassword(), token.getService().toString());
yarnClient.cancelRMDelegationToken(dToken);
return;
} finally {
yarnClient.stop();
}
}
private YarnClientImpl getYarnClient(Configuration conf,
InetSocketAddress rmAddress) {
YarnClientImpl yarnClient = new YarnClientImpl(rmAddress);
yarnClient.init(conf);
yarnClient.start();
return yarnClient;
}
}

View File

@ -0,0 +1 @@
org.apache.hadoop.yarn.security.RMDelegationTokenRenewer;

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.ClientRMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@ -45,8 +47,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl;
@ -65,10 +71,13 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoRe
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsRequestProto;
@ -78,6 +87,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationRequestPr
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
import com.google.protobuf.ServiceException;
@ -233,4 +243,31 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol,
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
}
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnRemoteException {
RenewDelegationTokenRequestProto requestProto =
((RenewDelegationTokenRequestPBImpl) request).getProto();
try {
return new RenewDelegationTokenResponsePBImpl(proxy.renewDelegationToken(
null, requestProto));
} catch (ServiceException e) {
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
}
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException {
CancelDelegationTokenRequestProto requestProto =
((CancelDelegationTokenRequestPBImpl) request).getProto();
try {
return new CancelDelegationTokenResponsePBImpl(
proxy.cancelDelegationToken(null, requestProto));
} catch (ServiceException e) {
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.impl.pb.service;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.ClientRMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
@ -29,7 +30,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl;
@ -48,9 +52,13 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoRe
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto;
@ -69,6 +77,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoReques
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
@ -212,4 +222,32 @@ public class ClientRMProtocolPBServiceImpl implements ClientRMProtocolPB {
throw new ServiceException(e);
}
}
@Override
public RenewDelegationTokenResponseProto renewDelegationToken(
RpcController controller, RenewDelegationTokenRequestProto proto)
throws ServiceException {
RenewDelegationTokenRequestPBImpl request =
new RenewDelegationTokenRequestPBImpl(proto);
try {
RenewDelegationTokenResponse response = real.renewDelegationToken(request);
return ((RenewDelegationTokenResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
throw new ServiceException(e);
}
}
@Override
public CancelDelegationTokenResponseProto cancelDelegationToken(
RpcController controller, CancelDelegationTokenRequestProto proto)
throws ServiceException {
CancelDelegationTokenRequestPBImpl request =
new CancelDelegationTokenRequestPBImpl(proto);
try {
CancelDelegationTokenResponse response = real.cancelDelegationToken(request);
return ((CancelDelegationTokenResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
throw new ServiceException(e);
}
}
}

View File

@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@ -39,6 +40,8 @@ import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@ -57,12 +60,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
@ -87,6 +93,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicy
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
/**
@ -105,7 +112,7 @@ public class ClientRMService extends AbstractService implements
private final RMAppManager rmAppManager;
private Server server;
private RMDelegationTokenSecretManager rmDTSecretManager;
protected RMDelegationTokenSecretManager rmDTSecretManager;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
InetSocketAddress clientBindAddress;
@ -122,16 +129,13 @@ public class ClientRMService extends AbstractService implements
this.applicationsACLsManager = applicationACLsManager;
this.rmDTSecretManager = rmDTSecretManager;
}
@Override
public void init(Configuration conf) {
clientBindAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
clientBindAddress = getBindAddress(conf);
super.init(conf);
}
@Override
public void start() {
Configuration conf = getConfig();
@ -156,6 +160,20 @@ public class ClientRMService extends AbstractService implements
super.start();
}
@Override
public void stop() {
if (this.server != null) {
this.server.stop();
}
super.stop();
}
InetSocketAddress getBindAddress(Configuration conf) {
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
}
@Private
public InetSocketAddress getBindAddress() {
return clientBindAddress;
@ -455,10 +473,7 @@ public class ClientRMService extends AbstractService implements
try {
// Verify that the connection is kerberos authenticated
AuthenticationMethod authMethod = UserGroupInformation
.getRealAuthenticationMethod(UserGroupInformation.getCurrentUser());
if (UserGroupInformation.isSecurityEnabled()
&& (authMethod != AuthenticationMethod.KERBEROS)) {
if (!isAllowedDelegationTokenOp()) {
throw new IOException(
"Delegation Token can be issued only with kerberos authentication");
}
@ -490,17 +505,66 @@ public class ClientRMService extends AbstractService implements
}
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnRemoteException {
try {
if (!isAllowedDelegationTokenOp()) {
throw new IOException(
"Delegation Token can be renewed only with kerberos authentication");
}
DelegationToken protoToken = request.getDelegationToken();
Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>(
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
new Text(protoToken.getKind()), new Text(protoToken.getService()));
String user = UserGroupInformation.getCurrentUser().getShortUserName();
long nextExpTime = rmDTSecretManager.renewToken(token, user);
RenewDelegationTokenResponse renewResponse = Records
.newRecord(RenewDelegationTokenResponse.class);
renewResponse.setNextExpirationTime(nextExpTime);
return renewResponse;
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException {
try {
if (!isAllowedDelegationTokenOp()) {
throw new IOException(
"Delegation Token can be cancelled only with kerberos authentication");
}
DelegationToken protoToken = request.getDelegationToken();
Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>(
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
new Text(protoToken.getKind()), new Text(protoToken.getService()));
String user = UserGroupInformation.getCurrentUser().getShortUserName();
rmDTSecretManager.cancelToken(token, user);
return Records.newRecord(CancelDelegationTokenResponse.class);
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
}
void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
}
@Override
public void stop() {
if (this.server != null) {
this.server.stop();
private boolean isAllowedDelegationTokenOp() throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
return EnumSet.of(AuthenticationMethod.KERBEROS,
AuthenticationMethod.KERBEROS_SSL,
AuthenticationMethod.CERTIFICATE)
.contains(UserGroupInformation.getCurrentUser()
.getRealAuthenticationMethod());
} else {
return true;
}
super.stop();
}
}

View File

@ -47,10 +47,10 @@ import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -66,8 +66,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;

View File

@ -0,0 +1,320 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
public class TestClientRMTokens {
private static final Log LOG = LogFactory.getLog(TestClientRMTokens.class);
@Test
public void testDelegationToken() throws IOException, InterruptedException {
final YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_PRINCIPAL, "testuser/localhost@apache.org");
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(conf);
ResourceScheduler scheduler = createMockScheduler(conf);
long initialInterval = 10000l;
long maxLifetime= 20000l;
long renewInterval = 10000l;
RMDelegationTokenSecretManager rmDtSecretManager = createRMDelegationTokenSecretManager(
initialInterval, maxLifetime, renewInterval);
rmDtSecretManager.startThreads();
LOG.info("Creating DelegationTokenSecretManager with initialInterval: "
+ initialInterval + ", maxLifetime: " + maxLifetime
+ ", renewInterval: " + renewInterval);
final ClientRMService clientRMService = new ClientRMServiceForTest(conf,
scheduler, rmDtSecretManager);
clientRMService.init(conf);
clientRMService.start();
ClientRMProtocol clientRMWithDT = null;
try {
// Create a user for the renewr and fake the authentication-method
UserGroupInformation loggedInUser = UserGroupInformation
.createRemoteUser("testrenewer@APACHE.ORG");
Assert.assertEquals("testrenewer", loggedInUser.getShortUserName());
// Default realm is APACHE.ORG
loggedInUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
DelegationToken token = getDelegationToken(loggedInUser, clientRMService,
loggedInUser.getShortUserName());
long tokenFetchTime = System.currentTimeMillis();
LOG.info("Got delegation token at: " + tokenFetchTime);
// Now try talking to RMService using the delegation token
clientRMWithDT = getClientRMProtocolWithDT(token,
clientRMService.getBindAddress(), "loginuser1", conf);
GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
try {
clientRMWithDT.getNewApplication(request);
} catch (UndeclaredThrowableException e) {
fail("Unexpected exception" + e);
}
// Renew after 50% of token age.
while(System.currentTimeMillis() < tokenFetchTime + initialInterval / 2) {
Thread.sleep(500l);
}
long nextExpTime = renewDelegationToken(loggedInUser, clientRMService, token);
long renewalTime = System.currentTimeMillis();
LOG.info("Renewed token at: " + renewalTime + ", NextExpiryTime: "
+ nextExpTime);
// Wait for first expiry, but before renewed expiry.
while (System.currentTimeMillis() > tokenFetchTime + initialInterval
&& System.currentTimeMillis() < nextExpTime) {
Thread.sleep(500l);
}
Thread.sleep(50l);
// Valid token because of renewal.
try {
clientRMWithDT.getNewApplication(request);
} catch (UndeclaredThrowableException e) {
fail("Unexpected exception" + e);
}
// Wait for expiry.
while(System.currentTimeMillis() < renewalTime + renewInterval) {
Thread.sleep(500l);
}
Thread.sleep(50l);
LOG.info("At time: " + System.currentTimeMillis() + ", token should be invalid");
// Token should have expired.
try {
clientRMWithDT.getNewApplication(request);
fail("Should not have succeeded with an expired token");
} catch (UndeclaredThrowableException e) {
assertTrue(e.getCause().getMessage().contains("is expired"));
}
// Test cancellation
// Stop the existing proxy, start another.
if (clientRMWithDT != null) {
RPC.stopProxy(clientRMWithDT);
clientRMWithDT = null;
}
token = getDelegationToken(loggedInUser, clientRMService,
loggedInUser.getShortUserName());
tokenFetchTime = System.currentTimeMillis();
LOG.info("Got delegation token at: " + tokenFetchTime);
// Now try talking to RMService using the delegation token
clientRMWithDT = getClientRMProtocolWithDT(token,
clientRMService.getBindAddress(), "loginuser2", conf);
request = Records.newRecord(GetNewApplicationRequest.class);
try {
clientRMWithDT.getNewApplication(request);
} catch (UndeclaredThrowableException e) {
fail("Unexpected exception" + e);
}
cancelDelegationToken(loggedInUser, clientRMService, token);
if (clientRMWithDT != null) {
RPC.stopProxy(clientRMWithDT);
clientRMWithDT = null;
}
// Creating a new connection.
clientRMWithDT = getClientRMProtocolWithDT(token,
clientRMService.getBindAddress(), "loginuser2", conf);
LOG.info("Cancelled delegation token at: " + System.currentTimeMillis());
// Verify cancellation worked.
try {
clientRMWithDT.getNewApplication(request);
fail("Should not have succeeded with a cancelled delegation token");
} catch (UndeclaredThrowableException e) {
}
} finally {
rmDtSecretManager.stopThreads();
// TODO PRECOMMIT Close proxies.
if (clientRMWithDT != null) {
RPC.stopProxy(clientRMWithDT);
}
}
}
// Get the delegation token directly as it is a little difficult to setup
// the kerberos based rpc.
private DelegationToken getDelegationToken(
final UserGroupInformation loggedInUser,
final ClientRMProtocol clientRMService, final String renewerString)
throws IOException, InterruptedException {
DelegationToken token = loggedInUser
.doAs(new PrivilegedExceptionAction<DelegationToken>() {
@Override
public DelegationToken run() throws YarnRemoteException {
GetDelegationTokenRequest request = Records
.newRecord(GetDelegationTokenRequest.class);
request.setRenewer(renewerString);
return clientRMService.getDelegationToken(request)
.getRMDelegationToken();
}
});
return token;
}
private long renewDelegationToken(final UserGroupInformation loggedInUser,
final ClientRMProtocol clientRMService, final DelegationToken dToken)
throws IOException, InterruptedException {
long nextExpTime = loggedInUser.doAs(new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws YarnRemoteException {
RenewDelegationTokenRequest request = Records
.newRecord(RenewDelegationTokenRequest.class);
request.setDelegationToken(dToken);
return clientRMService.renewDelegationToken(request)
.getNextExpirationTime();
}
});
return nextExpTime;
}
private void cancelDelegationToken(final UserGroupInformation loggedInUser,
final ClientRMProtocol clientRMService, final DelegationToken dToken)
throws IOException, InterruptedException {
loggedInUser.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws YarnRemoteException {
CancelDelegationTokenRequest request = Records
.newRecord(CancelDelegationTokenRequest.class);
request.setDelegationToken(dToken);
clientRMService.cancelDelegationToken(request);
return null;
}
});
}
private ClientRMProtocol getClientRMProtocolWithDT(DelegationToken token,
final InetSocketAddress rmAddress, String user, final Configuration conf) {
// Maybe consider converting to Hadoop token, serialize de-serialize etc
// before trying to renew the token.
UserGroupInformation ugi = UserGroupInformation
.createRemoteUser(user);
ugi.addToken(ProtoUtils.convertFromProtoFormat(token, rmAddress));
final YarnRPC rpc = YarnRPC.create(conf);
ClientRMProtocol clientRMWithDT = ugi
.doAs(new PrivilegedAction<ClientRMProtocol>() {
@Override
public ClientRMProtocol run() {
return (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
rmAddress, conf);
}
});
return clientRMWithDT;
}
class ClientRMServiceForTest extends ClientRMService {
public ClientRMServiceForTest(Configuration conf,
ResourceScheduler scheduler,
RMDelegationTokenSecretManager rmDTSecretManager) {
super(mock(RMContext.class), scheduler, mock(RMAppManager.class),
new ApplicationACLsManager(conf), rmDTSecretManager);
}
// Use a random port unless explicitly specified.
@Override
InetSocketAddress getBindAddress(Configuration conf) {
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS, 0);
}
@Override
public void stop() {
if (rmDTSecretManager != null) {
rmDTSecretManager.stopThreads();
}
super.stop();
}
}
private static ResourceScheduler createMockScheduler(Configuration conf) {
ResourceScheduler mockSched = mock(ResourceScheduler.class);
doReturn(BuilderUtils.newResource(512)).when(mockSched)
.getMinimumResourceCapability();
doReturn(BuilderUtils.newResource(5120)).when(mockSched)
.getMaximumResourceCapability();
return mockSched;
}
private static RMDelegationTokenSecretManager createRMDelegationTokenSecretManager(
long secretKeyInterval, long tokenMaxLifetime, long tokenRenewInterval) {
RMDelegationTokenSecretManager rmDtSecretManager = new RMDelegationTokenSecretManager(
secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, 3600000);
return rmDtSecretManager;
}
}