MAPREDUCE-3380. Token infrastructure for running clients which are not kerberos authenticated. (mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1229855 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mahadev Konar 2012-01-11 01:50:10 +00:00
parent 7712e70090
commit bc4b1f48d3
49 changed files with 1726 additions and 94 deletions

View File

@ -455,6 +455,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3639. Fixed TokenCache to work with absent FileSystem canonical
service-names. (Siddharth Seth via vinodkv)
MAPREDUCE-3380. Token infrastructure for running clients which are not kerberos
authenticated. (mahadev)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -37,6 +37,8 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
@ -382,5 +384,12 @@ public class MRClientService extends AbstractService
return response;
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnRemoteException {
throw RPCUtil.getRemoteException("MR AM not authorized to issue delegation" +
" token");
}
}
}

View File

@ -22,6 +22,8 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
@ -54,4 +56,5 @@ public interface MRClientProtocol {
public KillTaskResponse killTask(KillTaskRequest request) throws YarnRemoteException;
public KillTaskAttemptResponse killTaskAttempt(KillTaskAttemptRequest request) throws YarnRemoteException;
public FailTaskAttemptResponse failTaskAttempt(FailTaskAttemptRequest request) throws YarnRemoteException;
public GetDelegationTokenResponse getDelegationToken(GetDelegationTokenRequest request) throws YarnRemoteException;
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
/**
* {@link TokenIdentifier} that identifies delegation tokens
* issued by JobHistoryServer to delegate
* MR tasks talking to the JobHistoryServer.
*/
public class MRDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
public static final Text KIND_NAME = new Text("MR_DELEGATION_TOKEN");
public MRDelegationTokenIdentifier() {
}
/**
* Create a new delegation token identifier
* @param owner the effective username of the token owner
* @param renewer the username of the renewer
* @param realUser the real username of the token owner
*/
public MRDelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
super(owner, renewer, realUser);
}
@Override
public Text getKind() {
return KIND_NAME;
}
@InterfaceAudience.Private
public static class Renewer extends Token.TrivialRenewer {
@Override
protected Text getKind() {
return KIND_NAME;
}
}
}

View File

@ -29,6 +29,8 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
@ -51,6 +53,8 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.FailTaskAttemp
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.FailTaskAttemptResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetCountersRequestPBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetCountersResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetDelegationTokenRequestPBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetDelegationTokenResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetDiagnosticsRequestPBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetDiagnosticsResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetJobReportRequestPBImpl;
@ -71,6 +75,7 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.KillTaskReques
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.KillTaskResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.FailTaskAttemptRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetCountersRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetDelegationTokenRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetDiagnosticsRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetJobReportRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetTaskAttemptCompletionEventsRequestProto;
@ -214,7 +219,26 @@ public class MRClientProtocolPBClientImpl implements MRClientProtocol {
}
}
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnRemoteException {
GetDelegationTokenRequestProto requestProto = ((GetDelegationTokenRequestPBImpl)
request).getProto();
try {
return new GetDelegationTokenResponsePBImpl(proxy.getDelegationToken(
null, requestProto));
} catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) {
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
}
}
@Override
public KillJobResponse killJob(KillJobRequest request)
throws YarnRemoteException {

View File

@ -23,6 +23,8 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
@ -44,6 +46,8 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.FailTaskAttemp
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.FailTaskAttemptResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetCountersRequestPBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetCountersResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetDelegationTokenRequestPBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetDelegationTokenResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetDiagnosticsRequestPBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetDiagnosticsResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetJobReportRequestPBImpl;
@ -66,6 +70,8 @@ import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.FailTaskAttemptReque
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.FailTaskAttemptResponseProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetCountersRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetCountersResponseProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetDelegationTokenRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetDelegationTokenResponseProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetDiagnosticsRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetDiagnosticsResponseProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetJobReportRequestProto;
@ -184,7 +190,20 @@ public class MRClientProtocolPBServiceImpl implements BlockingInterface {
throw new ServiceException(e);
}
}
@Override
public GetDelegationTokenResponseProto getDelegationToken(
RpcController controller, GetDelegationTokenRequestProto proto)
throws ServiceException {
GetDelegationTokenRequest request = new GetDelegationTokenRequestPBImpl(proto);
try {
GetDelegationTokenResponse response = real.getDelegationToken(request);
return ((GetDelegationTokenResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
throw new ServiceException(e);
}
}
@Override
public KillJobResponseProto killJob(RpcController controller,
KillJobRequestProto proto) throws ServiceException {

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
@Public
@Evolving
public interface GetDelegationTokenRequest {
String getRenewer();
void setRenewer(String renewer);
}

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api.protocolrecords;
import org.apache.hadoop.yarn.api.records.DelegationToken;
public interface GetDelegationTokenResponse {
void setDelegationToken(DelegationToken clientDToken);
DelegationToken getDelegationToken();
}

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetDelegationTokenRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetDelegationTokenRequestProtoOrBuilder;
import org.apache.hadoop.yarn.api.records.ProtoBase;
public class GetDelegationTokenRequestPBImpl extends
ProtoBase<GetDelegationTokenRequestProto> implements GetDelegationTokenRequest {
String renewer;
GetDelegationTokenRequestProto proto =
GetDelegationTokenRequestProto.getDefaultInstance();
GetDelegationTokenRequestProto.Builder builder = null;
boolean viaProto = false;
public GetDelegationTokenRequestPBImpl() {
builder = GetDelegationTokenRequestProto.newBuilder();
}
public GetDelegationTokenRequestPBImpl (
GetDelegationTokenRequestProto proto) {
this.proto = proto;
viaProto = true;
}
@Override
public String getRenewer(){
GetDelegationTokenRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.renewer != null) {
return this.renewer;
}
if (!p.hasRenewer()) {
return null;
}
this.renewer = p.getRenewer();
return this.renewer;
}
@Override
public void setRenewer(String renewer) {
maybeInitBuilder();
if (renewer == null)
builder.clearRenewer();
this.renewer = renewer;
}
@Override
public GetDelegationTokenRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (renewer != null) {
builder.setRenewer(this.renewer);
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = GetDelegationTokenRequestProto.newBuilder(proto);
}
viaProto = false;
}
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetDelegationTokenResponseProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetDelegationTokenResponseProtoOrBuilder;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.DelegationTokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.DelegationTokenProto;
public class GetDelegationTokenResponsePBImpl extends
ProtoBase<GetDelegationTokenResponseProto> implements GetDelegationTokenResponse {
DelegationToken mrToken;
GetDelegationTokenResponseProto proto =
GetDelegationTokenResponseProto.getDefaultInstance();
GetDelegationTokenResponseProto.Builder builder = null;
boolean viaProto = false;
public GetDelegationTokenResponsePBImpl() {
builder = GetDelegationTokenResponseProto.newBuilder();
}
public GetDelegationTokenResponsePBImpl (
GetDelegationTokenResponseProto proto) {
this.proto = proto;
viaProto = true;
}
@Override
public DelegationToken getDelegationToken() {
GetDelegationTokenResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.mrToken != null) {
return this.mrToken;
}
if (!p.hasMRDelegationToken()) {
return null;
}
this.mrToken = convertFromProtoFormat(p.getMRDelegationToken());
return this.mrToken;
}
@Override
public void setDelegationToken(DelegationToken mrToken) {
maybeInitBuilder();
if (mrToken == null)
builder.clearMRDelegationToken();
this.mrToken = mrToken;
}
@Override
public GetDelegationTokenResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (mrToken != null) {
builder.setMRDelegationToken(convertToProtoFormat(this.mrToken));
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = GetDelegationTokenResponseProto.newBuilder(proto);
}
viaProto = false;
}
private DelegationTokenPBImpl convertFromProtoFormat(DelegationTokenProto p) {
return new DelegationTokenPBImpl(p);
}
private DelegationTokenProto convertToProtoFormat(DelegationToken t) {
return ((DelegationTokenPBImpl)t).getProto();
}
}

View File

@ -24,7 +24,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.yarn.proto.MRClientProtocol;
public class ClientHSSecurityInfo extends SecurityInfo {
@ -56,7 +58,22 @@ public class ClientHSSecurityInfo extends SecurityInfo {
@Override
public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
return null;
}
if (!protocol
.equals(MRClientProtocol.MRClientProtocolService.BlockingInterface.class)) {
return null;
}
return new TokenInfo() {
@Override
public Class<? extends Annotation> annotationType() {
return null;
}
@Override
public Class<? extends TokenSelector<? extends TokenIdentifier>>
value() {
return ClientHSTokenSelector.class;
}
}; }
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.security.client;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector;
public class ClientHSTokenSelector implements
TokenSelector<MRDelegationTokenIdentifier> {
private static final Log LOG = LogFactory
.getLog(ClientHSTokenSelector.class);
@SuppressWarnings("unchecked")
public Token<MRDelegationTokenIdentifier> selectToken(Text service,
Collection<Token<? extends TokenIdentifier>> tokens) {
if (service == null) {
return null;
}
LOG.debug("Looking for a token with service " + service.toString());
for (Token<? extends TokenIdentifier> token : tokens) {
if (LOG.isDebugEnabled()) {
LOG.debug("Token kind is " + token.getKind().toString()
+ " and the token's service name is " + token.getService());
}
if (MRDelegationTokenIdentifier.KIND_NAME.equals(token.getKind())
&& service.equals(token.getService())) {
return (Token<MRDelegationTokenIdentifier>) token;
}
}
return null;
}
}

View File

@ -30,7 +30,7 @@ service MRClientProtocolService {
rpc getTaskAttemptCompletionEvents (GetTaskAttemptCompletionEventsRequestProto) returns (GetTaskAttemptCompletionEventsResponseProto);
rpc getTaskReports (GetTaskReportsRequestProto) returns (GetTaskReportsResponseProto);
rpc getDiagnostics (GetDiagnosticsRequestProto) returns (GetDiagnosticsResponseProto);
rpc getDelegationToken (GetDelegationTokenRequestProto) returns (GetDelegationTokenResponseProto);
rpc killJob (KillJobRequestProto) returns (KillJobResponseProto);
rpc killTask (KillTaskRequestProto) returns (KillTaskResponseProto);
rpc killTaskAttempt (KillTaskAttemptRequestProto) returns (KillTaskAttemptResponseProto);

View File

@ -22,6 +22,7 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
import "mr_protos.proto";
import "yarn_protos.proto";
message GetJobReportRequestProto {
optional JobIdProto job_id = 1;
@ -75,6 +76,13 @@ message GetDiagnosticsResponseProto {
repeated string diagnostics = 1;
}
message GetDelegationTokenRequestProto {
optional string renewer = 1;
}
message GetDelegationTokenResponseProto {
optional DelegationTokenProto m_r_delegation_token = 1;
}
message KillJobRequestProto {
optional JobIdProto job_id = 1;

View File

@ -30,6 +30,8 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
@ -123,28 +125,24 @@ public class TestRPCFactories {
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws YarnRemoteException {
// TODO Auto-generated method stub
return null;
}
@Override
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
throws YarnRemoteException {
// TODO Auto-generated method stub
return null;
}
@Override
public GetTaskAttemptReportResponse getTaskAttemptReport(
GetTaskAttemptReportRequest request) throws YarnRemoteException {
// TODO Auto-generated method stub
return null;
}
@Override
public GetCountersResponse getCounters(GetCountersRequest request)
throws YarnRemoteException {
// TODO Auto-generated method stub
return null;
}
@ -152,51 +150,49 @@ public class TestRPCFactories {
public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
GetTaskAttemptCompletionEventsRequest request)
throws YarnRemoteException {
// TODO Auto-generated method stub
return null;
}
@Override
public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
throws YarnRemoteException {
// TODO Auto-generated method stub
return null;
}
@Override
public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
throws YarnRemoteException {
// TODO Auto-generated method stub
return null;
}
@Override
public KillJobResponse killJob(KillJobRequest request)
throws YarnRemoteException {
// TODO Auto-generated method stub
return null;
}
@Override
public KillTaskResponse killTask(KillTaskRequest request)
throws YarnRemoteException {
// TODO Auto-generated method stub
return null;
}
@Override
public KillTaskAttemptResponse killTaskAttempt(
KillTaskAttemptRequest request) throws YarnRemoteException {
// TODO Auto-generated method stub
return null;
}
@Override
public FailTaskAttemptResponse failTaskAttempt(
FailTaskAttemptRequest request) throws YarnRemoteException {
// TODO Auto-generated method stub
return null;
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnRemoteException {
return null;
}
}
}

View File

@ -140,7 +140,20 @@ import org.apache.hadoop.util.ToolRunner;
public class JobClient extends CLI {
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
/* notes that get delegation token was called. Again this is hack for oozie
* to make sure we add history server delegation tokens to the credentials
* for the job. Since the api only allows one delegation token to be returned,
* we have to add this hack.
*/
private boolean getDelegationTokenCalled = false;
/* notes the renewer that will renew the delegation token */
private Text dtRenewer = null;
/* do we need a HS delegation token for this client */
static final String HS_DELEGATION_TOKEN_REQUIRED
= "mapreduce.history.server.delegationtoken.required";
static final String HS_DELEGATION_TOKEN_RENEWER
= "mapreduce.history.server.delegationtoken.renewer";
static{
ConfigUtil.loadResources();
}
@ -584,6 +597,12 @@ public class JobClient extends CLI {
try {
conf.setBooleanIfUnset("mapred.mapper.new-api", false);
conf.setBooleanIfUnset("mapred.reducer.new-api", false);
if (getDelegationTokenCalled) {
conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled);
getDelegationTokenCalled = false;
conf.set(HS_DELEGATION_TOKEN_RENEWER, dtRenewer.toString());
dtRenewer = null;
}
Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
@Override
public Job run() throws IOException, ClassNotFoundException,
@ -1170,6 +1189,8 @@ public class JobClient extends CLI {
*/
public Token<DelegationTokenIdentifier>
getDelegationToken(final Text renewer) throws IOException, InterruptedException {
getDelegationTokenCalled = true;
dtRenewer = renewer;
return clientUgi.doAs(new
PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
public Token<DelegationTokenIdentifier> run() throws IOException,

View File

@ -279,6 +279,12 @@ public interface MRJobConfig {
public static final String JOB_ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " ";
/* config for tracking the local file where all the credentials for the job
* credentials.
*/
public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY =
"mapreduce.job.credentials.binary";
public static final String JOB_SUBMITHOST =
"mapreduce.job.submithostname";

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Master;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@ -101,7 +102,7 @@ public class TokenCache {
String delegTokenRenewer = Master.getMasterPrincipal(conf);
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
throw new IOException(
"Can't get JobTracker Kerberos principal for use as renewer");
"Can't get Master Kerberos principal for use as renewer");
}
boolean readFile = true;
@ -112,7 +113,7 @@ public class TokenCache {
if (readFile) {
readFile = false;
String binaryTokenFilename =
conf.get("mapreduce.job.credentials.binary");
conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
if (binaryTokenFilename != null) {
Credentials binary;
try {

View File

@ -35,10 +35,13 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
@ -68,13 +71,17 @@ import org.apache.hadoop.mapreduce.v2.hs.webapp.HsWebApp;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
@ -92,11 +99,14 @@ public class HistoryClientService extends AbstractService {
private WebApp webApp;
private InetSocketAddress bindAddress;
private HistoryContext history;
public HistoryClientService(HistoryContext history) {
private JHSDelegationTokenSecretManager jhsDTSecretManager;
public HistoryClientService(HistoryContext history,
JHSDelegationTokenSecretManager jhsDTSecretManager) {
super("HistoryClientService");
this.history = history;
this.protocolHandler = new MRClientProtocolHandler();
this.jhsDTSecretManager = jhsDTSecretManager;
}
public void start() {
@ -110,14 +120,15 @@ public class HistoryClientService extends AbstractService {
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS);
InetAddress hostNameResolved = null;
try {
hostNameResolved = InetAddress.getLocalHost(); //address.getAddress().getLocalHost();
hostNameResolved = InetAddress.getLocalHost();
//address.getAddress().getLocalHost();
} catch (UnknownHostException e) {
throw new YarnException(e);
}
server =
rpc.getServer(MRClientProtocol.class, protocolHandler, address,
conf, null,
conf, jhsDTSecretManager,
conf.getInt(JHAdminConfig.MR_HISTORY_CLIENT_THREAD_COUNT,
JHAdminConfig.DEFAULT_MR_HISTORY_CLIENT_THREAD_COUNT));
@ -278,6 +289,38 @@ public class HistoryClientService extends AbstractService {
}
return response;
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnRemoteException {
try {
// Verify that the connection is kerberos authenticated
AuthenticationMethod authMethod = UserGroupInformation
.getRealAuthenticationMethod(UserGroupInformation.getCurrentUser());
if (UserGroupInformation.isSecurityEnabled()
&& (authMethod != AuthenticationMethod.KERBEROS)) {
throw new IOException(
"Delegation Token can be issued only with kerberos authentication");
}
GetDelegationTokenResponse response = recordFactory.newRecordInstance(
GetDelegationTokenResponse.class);
MRDelegationTokenIdentifier tokenIdentifier =
new MRDelegationTokenIdentifier();
Token<MRDelegationTokenIdentifier> realJHSToken =
new Token<MRDelegationTokenIdentifier>(tokenIdentifier,
jhsDTSecretManager);
DelegationToken mrDToken = BuilderUtils.newDelegationToken(
realJHSToken.getIdentifier(), realJHSToken.getKind().toString(),
realJHSToken.getPassword(), bindAddress.getAddress().getHostAddress()
+ ":" + bindAddress.getPort());
response.setDelegationToken(mrDToken);
return response;
} catch (IOException i) {
throw RPCUtil.getRemoteException(i);
}
}
private void checkAccess(Job job, JobACL jobOperation)
throws YarnRemoteException {

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
/**
* A MapReduce specific delegation token secret manager.
* The secret manager is responsible for generating and accepting the password
* for each token.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class JHSDelegationTokenSecretManager
extends AbstractDelegationTokenSecretManager<MRDelegationTokenIdentifier> {
/**
* Create a secret manager
* @param delegationKeyUpdateInterval the number of seconds for rolling new
* secret keys.
* @param delegationTokenMaxLifetime the maximum lifetime of the delegation
* tokens
* @param delegationTokenRenewInterval how often the tokens must be renewed
* @param delegationTokenRemoverScanInterval how often the tokens are scanned
* for expired tokens
*/
public JHSDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime,
long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
}
@Override
public MRDelegationTokenIdentifier createIdentifier() {
return new MRDelegationTokenIdentifier();
}
}

View File

@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.StringUtils;
@ -41,6 +42,7 @@ public class JobHistoryServer extends CompositeService {
private HistoryContext historyContext;
private HistoryClientService clientService;
private JobHistory jobHistoryService;
private JHSDelegationTokenSecretManager jhsDTSecretManager;
public JobHistoryServer() {
super(JobHistoryServer.class.getName());
@ -56,17 +58,52 @@ public class JobHistoryServer extends CompositeService {
}
jobHistoryService = new JobHistory();
historyContext = (HistoryContext)jobHistoryService;
clientService = new HistoryClientService(historyContext);
this.jhsDTSecretManager = createJHSSecretManager(conf);
clientService = new HistoryClientService(historyContext,
this.jhsDTSecretManager);
addService(jobHistoryService);
addService(clientService);
super.init(config);
}
protected JHSDelegationTokenSecretManager createJHSSecretManager(
Configuration conf) {
long secretKeyInterval =
conf.getLong(MRConfig.DELEGATION_KEY_UPDATE_INTERVAL_KEY,
MRConfig.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
long tokenMaxLifetime =
conf.getLong(MRConfig.DELEGATION_TOKEN_MAX_LIFETIME_KEY,
MRConfig.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
long tokenRenewInterval =
conf.getLong(MRConfig.DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
MRConfig.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
return new JHSDelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime, tokenRenewInterval, 3600000);
}
protected void doSecureLogin(Configuration conf) throws IOException {
SecurityUtil.login(conf, JHAdminConfig.MR_HISTORY_KEYTAB,
JHAdminConfig.MR_HISTORY_PRINCIPAL);
}
@Override
public void start() {
try {
jhsDTSecretManager.startThreads();
} catch(IOException io) {
LOG.error("Error while starting the Secret Manager threads", io);
throw new RuntimeException(io);
}
super.start();
}
@Override
public void stop() {
jhsDTSecretManager.stopThreads();
super.stop();
}
public static void main(String[] args) {
StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
try {

View File

@ -70,7 +70,15 @@ public class ClientCache {
return client;
}
private MRClientProtocol instantiateHistoryProxy()
protected synchronized MRClientProtocol getInitializedHSProxy()
throws IOException {
if (this.hsProxy == null) {
hsProxy = instantiateHistoryProxy();
}
return this.hsProxy;
}
protected MRClientProtocol instantiateHistoryProxy()
throws IOException {
final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
if (StringUtils.isEmpty(serviceAddr)) {

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
@ -201,4 +203,10 @@ public class NotRunningJob implements MRClientProtocol {
return resp;
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnRemoteException {
/* Should not be invoked by anyone. */
throw new NotImplementedException();
}
}

View File

@ -37,6 +37,8 @@ import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
@ -59,12 +61,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
// TODO: This should be part of something like yarn-client.
@ -156,11 +160,20 @@ public class ResourceMgrDelegate {
}
public Token<DelegationTokenIdentifier> getDelegationToken(Text arg0)
@SuppressWarnings("rawtypes")
public Token getDelegationToken(Text renewer)
throws IOException, InterruptedException {
// TODO: Implement getDelegationToken
LOG.warn("getDelegationToken - Not Implemented");
return null;
/* get the token from RM */
org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest
rmDTRequest = recordFactory.newRecordInstance(
org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest.class);
rmDTRequest.setRenewer(renewer.toString());
org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse
response = applicationsManager.getDelegationToken(rmDTRequest);
DelegationToken yarnToken = response.getRMDelegationToken();
return new Token<RMDelegationTokenIdentifier>(yarnToken.getIdentifier().array(),
yarnToken.getPassword().array(),
new Text(yarnToken.getKind()), new Text(yarnToken.getService()));
}

View File

@ -28,6 +28,7 @@ import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
@ -54,6 +55,9 @@ import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.Credentials;
@ -68,6 +72,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@ -84,6 +89,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* This class enables the current JobClient (0.22 hadoop) to run on YARN.
*/
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
public class YARNRunner implements ClientProtocol {
private static final Log LOG = LogFactory.getLog(YARNRunner.class);
@ -93,7 +99,15 @@ public class YARNRunner implements ClientProtocol {
private ClientCache clientCache;
private Configuration conf;
private final FileContext defaultFileContext;
/* usually is false unless the jobclient getdelegation token is
* called. This is a hack wherein we do return a token from RM
* on getDelegationtoken but due to the restricted api on jobclient
* we just add a job history DT token when submitting a job.
*/
private static final boolean DEFAULT_HS_DELEGATION_TOKEN_REQUIRED =
false;
/**
* Yarn runner incapsulates the client interface of
* yarn
@ -131,7 +145,16 @@ public class YARNRunner implements ClientProtocol {
throw new RuntimeException("Error in instantiating YarnClient", ufe);
}
}
@Private
/**
* Used for testing mostly.
* @param resMgrDelegate the resource manager delegate to set to.
*/
public void setResourceMgrDelegate(ResourceMgrDelegate resMgrDelegate) {
this.resMgrDelegate = resMgrDelegate;
}
@Override
public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
throws IOException, InterruptedException {
@ -161,10 +184,26 @@ public class YARNRunner implements ClientProtocol {
return resMgrDelegate.getClusterMetrics();
}
private Token<MRDelegationTokenIdentifier> getDelegationTokenFromHS(
MRClientProtocol hsProxy, Text renewer) throws IOException,
InterruptedException {
GetDelegationTokenRequest request = recordFactory
.newRecordInstance(GetDelegationTokenRequest.class);
request.setRenewer(renewer.toString());
DelegationToken mrDelegationToken = hsProxy.getDelegationToken(request)
.getDelegationToken();
return new Token<MRDelegationTokenIdentifier>(mrDelegationToken
.getIdentifier().array(), mrDelegationToken.getPassword().array(),
new Text(mrDelegationToken.getKind()), new Text(
mrDelegationToken.getService()));
}
@Override
public Token<DelegationTokenIdentifier> getDelegationToken(Text arg0)
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException, InterruptedException {
return resMgrDelegate.getDelegationToken(arg0);
// The token is only used for serialization. So the type information
// mismatch should be fine.
return resMgrDelegate.getDelegationToken(renewer);
}
@Override
@ -224,7 +263,16 @@ public class YARNRunner implements ClientProtocol {
@Override
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
// JobClient will set this flag if getDelegationToken is called, if so, get
// the delegation tokens for the HistoryServer also.
if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED,
DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
Token hsDT = getDelegationTokenFromHS(clientCache.
getInitializedHSProxy(), new Text(
conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER)));
ts.addToken(hsDT.getService(), hsDT);
}
// Upload only in security mode: TODO
Path applicationTokensFile =
new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);

View File

@ -76,6 +76,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
@ -330,6 +332,12 @@ public class TestClientRedirect {
GetQueueUserAclsInfoRequest request) throws YarnRemoteException {
return null;
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnRemoteException {
return null;
}
}
class HistoryService extends AMService {
@ -482,6 +490,13 @@ public class TestClientRedirect {
FailTaskAttemptRequest request) throws YarnRemoteException {
return null;
}
@Override
public org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse getDelegationToken(
org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest request)
throws YarnRemoteException {
return null;
}
}
static Counters getMyCounters() {

View File

@ -18,18 +18,34 @@
package org.apache.hadoop.mapreduce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.ByteBuffer;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.ResourceMgrDelegate;
import org.apache.hadoop.mapred.YARNRunner;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.Test;
public class TestYarnClientProtocolProvider extends TestCase {
private static final RecordFactory recordFactory = RecordFactoryProvider.
getRecordFactory(null);
@Test
public void testClusterWithYarnClientProtocolProvider() throws Exception {
@ -68,7 +84,24 @@ public class TestYarnClientProtocolProvider extends TestCase {
conf = new Configuration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
cluster = new Cluster(conf);
cluster.getDelegationToken(new Text(" "));
YARNRunner yrunner = (YARNRunner) cluster.getClient();
GetDelegationTokenResponse getDTResponse =
recordFactory.newRecordInstance(GetDelegationTokenResponse.class);
DelegationToken rmDTToken = recordFactory.newRecordInstance(
DelegationToken.class);
rmDTToken.setIdentifier(ByteBuffer.wrap(new byte[2]));
rmDTToken.setKind("Testclusterkind");
rmDTToken.setPassword(ByteBuffer.wrap("testcluster".getBytes()));
rmDTToken.setService("0.0.0.0:8040");
getDTResponse.setRMDelegationToken(rmDTToken);
ClientRMProtocol cRMProtocol = mock(ClientRMProtocol.class);
when(cRMProtocol.getDelegationToken(any(
GetDelegationTokenRequest.class))).thenReturn(getDTResponse);
ResourceMgrDelegate rmgrDelegate = new ResourceMgrDelegate(
new YarnConfiguration(conf), cRMProtocol);
yrunner.setResourceMgrDelegate(rmgrDelegate);
Token t = cluster.getDelegationToken(new Text(" "));
assertTrue("Testclusterkind".equals(t.getKind().toString()));
} finally {
if (cluster != null) {
cluster.close();

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.api;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@ -230,4 +233,19 @@ public interface ClientRMProtocol {
public GetQueueUserAclsInfoResponse getQueueUserAcls(
GetQueueUserAclsInfoRequest request)
throws YarnRemoteException;
/**
* <p>The interface used by clients to get delegation token, enabling the
* containers to be able to talk to the service using those tokens.
*
* <p> The <code>ResourceManager</code> responds with the delegation token
* {@link DelegationToken} that can be used by the client to speak to this
* service.
* @param request request to get a delegation token for the client.
* @return delegation token that can be used to talk to this service
* @throws YarnRemoteException
*/
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request)
throws YarnRemoteException;
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import clover.org.apache.velocity.runtime.resource.ResourceManager;
/**
* The request issued by the client to get a delegation token from
* the {@link ResourceManager}.
* for more information.
*/
@Public
@Evolving
public interface GetDelegationTokenRequest {
String getRenewer();
void setRenewer(String renewer);
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.records.DelegationToken;
/**
* Response to a {@link GetDelegationTokenRequest} request
* from the client. The response contains the token that
* can be used by the containers to talk to ClientRMService.
*
*/
@Public
@Evolving
public interface GetDelegationTokenResponse {
DelegationToken getRMDelegationToken();
void setRMDelegationToken(DelegationToken rmDTToken);
}

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetDelegationTokenRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetDelegationTokenRequestProtoOrBuilder;
public class GetDelegationTokenRequestPBImpl extends
ProtoBase<GetDelegationTokenRequestProto> implements GetDelegationTokenRequest {
String renewer;
GetDelegationTokenRequestProto proto =
GetDelegationTokenRequestProto.getDefaultInstance();
GetDelegationTokenRequestProto.Builder builder = null;
boolean viaProto = false;
public GetDelegationTokenRequestPBImpl() {
builder = GetDelegationTokenRequestProto.newBuilder();
}
public GetDelegationTokenRequestPBImpl (
GetDelegationTokenRequestProto proto) {
this.proto = proto;
viaProto = true;
}
@Override
public String getRenewer(){
GetDelegationTokenRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.renewer != null) {
return this.renewer;
}
if (!p.hasRenewer()) {
return null;
}
this.renewer = p.getRenewer();
return this.renewer;
}
@Override
public void setRenewer(String renewer) {
maybeInitBuilder();
if (renewer == null)
builder.clearRenewer();
this.renewer = renewer;
}
@Override
public GetDelegationTokenRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (renewer != null) {
builder.setRenewer(this.renewer);
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = GetDelegationTokenRequestProto.newBuilder(proto);
}
viaProto = false;
}
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.DelegationTokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.DelegationTokenProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetDelegationTokenResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetDelegationTokenResponseProtoOrBuilder;
public class GetDelegationTokenResponsePBImpl extends
ProtoBase<GetDelegationTokenResponseProto> implements GetDelegationTokenResponse {
DelegationToken appToken;
GetDelegationTokenResponseProto proto =
GetDelegationTokenResponseProto.getDefaultInstance();
GetDelegationTokenResponseProto.Builder builder = null;
boolean viaProto = false;
public GetDelegationTokenResponsePBImpl() {
builder = GetDelegationTokenResponseProto.newBuilder();
}
public GetDelegationTokenResponsePBImpl (
GetDelegationTokenResponseProto proto) {
this.proto = proto;
viaProto = true;
}
@Override
public DelegationToken getRMDelegationToken() {
GetDelegationTokenResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.appToken != null) {
return this.appToken;
}
if (!p.hasApplicationToken()) {
return null;
}
this.appToken = convertFromProtoFormat(p.getApplicationToken());
return this.appToken;
}
@Override
public void setRMDelegationToken(DelegationToken appToken) {
maybeInitBuilder();
if (appToken == null)
builder.clearApplicationToken();
this.appToken = appToken;
}
@Override
public GetDelegationTokenResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (appToken != null) {
builder.setApplicationToken(convertToProtoFormat(this.appToken));
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = GetDelegationTokenResponseProto.newBuilder(proto);
}
viaProto = false;
}
private DelegationTokenPBImpl convertFromProtoFormat(DelegationTokenProto p) {
return new DelegationTokenPBImpl(p);
}
private DelegationTokenProto convertToProtoFormat(DelegationToken t) {
return ((DelegationTokenPBImpl)t).getProto();
}
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
/**
* The Delegation tokens have a identifier which maps to
* {@link AbstractDelegationTokenIdentifier}.
*
*/
@Public
@Evolving
public interface DelegationToken {
/**
* Get the token identifier.
* @return token identifier
*/
@Public
@Stable
ByteBuffer getIdentifier();
@Private
@Stable
void setIdentifier(ByteBuffer identifier);
/**
* Get the token password
* @return token password
*/
@Public
@Stable
ByteBuffer getPassword();
@Private
@Stable
void setPassword(ByteBuffer password);
/**
* Get the token kind.
* @return token kind
*/
@Public
@Stable
String getKind();
@Private
@Stable
void setKind(String kind);
/**
* Get the service to which the token is allocated.
* @return service to which the token is allocated
*/
@Public
@Stable
String getService();
@Private
@Stable
void setService(String service);
}

View File

@ -0,0 +1,156 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import java.nio.ByteBuffer;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnProtos.DelegationTokenProto;
import org.apache.hadoop.yarn.proto.YarnProtos.DelegationTokenProtoOrBuilder;
public class DelegationTokenPBImpl extends ProtoBase<DelegationTokenProto>
implements DelegationToken {
private DelegationTokenProto proto = DelegationTokenProto.getDefaultInstance();
private DelegationTokenProto.Builder builder = null;
private boolean viaProto = false;
private ByteBuffer identifier;
private ByteBuffer password;
public DelegationTokenPBImpl() {
builder = DelegationTokenProto.newBuilder();
}
public DelegationTokenPBImpl(DelegationTokenProto proto) {
this.proto = proto;
viaProto = true;
}
public synchronized DelegationTokenProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private synchronized void mergeLocalToBuilder() {
if (this.identifier != null) {
builder.setIdentifier(convertToProtoFormat(this.identifier));
}
if (this.password != null) {
builder.setPassword(convertToProtoFormat(this.password));
}
}
private synchronized void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = DelegationTokenProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized ByteBuffer getIdentifier() {
DelegationTokenProtoOrBuilder p = viaProto ? proto : builder;
if (this.identifier != null) {
return this.identifier;
}
if (!p.hasIdentifier()) {
return null;
}
this.identifier = convertFromProtoFormat(p.getIdentifier());
return this.identifier;
}
@Override
public synchronized void setIdentifier(ByteBuffer identifier) {
maybeInitBuilder();
if (identifier == null)
builder.clearIdentifier();
this.identifier = identifier;
}
@Override
public synchronized ByteBuffer getPassword() {
DelegationTokenProtoOrBuilder p = viaProto ? proto : builder;
if (this.password != null) {
return this.password;
}
if (!p.hasPassword()) {
return null;
}
this.password = convertFromProtoFormat(p.getPassword());
return this.password;
}
@Override
public synchronized void setPassword(ByteBuffer password) {
maybeInitBuilder();
if (password == null)
builder.clearPassword();
this.password = password;
}
@Override
public synchronized String getKind() {
DelegationTokenProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasKind()) {
return null;
}
return (p.getKind());
}
@Override
public synchronized void setKind(String kind) {
maybeInitBuilder();
if (kind == null) {
builder.clearKind();
return;
}
builder.setKind((kind));
}
@Override
public synchronized String getService() {
DelegationTokenProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasService()) {
return null;
}
return (p.getService());
}
@Override
public synchronized void setService(String service) {
maybeInitBuilder();
if (service == null) {
builder.clearService();
return;
}
builder.setService((service));
}
}

View File

@ -33,5 +33,6 @@ service ClientRMProtocolService {
rpc getClusterNodes (GetClusterNodesRequestProto) returns (GetClusterNodesResponseProto);
rpc getQueueInfo (GetQueueInfoRequestProto) returns (GetQueueInfoResponseProto);
rpc getQueueUserAcls (GetQueueUserAclsInfoRequestProto) returns (GetQueueUserAclsInfoResponseProto);
rpc getDelegationToken(GetDelegationTokenRequestProto) returns (GetDelegationTokenResponseProto);
}

View File

@ -65,6 +65,13 @@ message ContainerTokenProto {
optional string service = 4;
}
message DelegationTokenProto {
optional bytes identifier = 1;
optional bytes password = 2;
optional string kind = 3;
optional string service = 4;
}
message ContainerProto {
optional ContainerIdProto id = 1;
optional NodeIdProto nodeId = 2;

View File

@ -138,6 +138,15 @@ message GetQueueUserAclsInfoResponseProto {
repeated QueueUserACLInfoProto queueUserAcls = 1;
}
message GetDelegationTokenRequestProto {
optional string renewer = 1;
}
message GetDelegationTokenResponseProto {
optional DelegationTokenProto application_token = 1;
}
//////////////////////////////////////////////////////
/////// client_NM_Protocol ///////////////////////////
//////////////////////////////////////////////////////

View File

@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
@ -51,6 +53,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterMetricsReque
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterMetricsResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterNodesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterNodesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoRequestPBImpl;
@ -68,6 +72,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestP
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetDelegationTokenRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto;
@ -247,4 +252,22 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol {
}
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnRemoteException {
GetDelegationTokenRequestProto requestProto =
((GetDelegationTokenRequestPBImpl)request).getProto();
try {
return new GetDelegationTokenResponsePBImpl(
proxy.getDelegationToken(null, requestProto));
} catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) {
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
}
}
}

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
@ -36,6 +37,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterMetricsReque
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterMetricsResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterNodesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterNodesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoRequestPBImpl;
@ -56,6 +59,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsRequestPr
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetDelegationTokenRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetDelegationTokenResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto;
@ -194,4 +199,17 @@ public class ClientRMProtocolPBServiceImpl implements BlockingInterface {
}
}
@Override
public GetDelegationTokenResponseProto getDelegationToken(
RpcController controller, GetDelegationTokenRequestProto proto)
throws ServiceException {
GetDelegationTokenRequestPBImpl request =
new GetDelegationTokenRequestPBImpl(proto);
try {
GetDelegationTokenResponse response = real.getDelegationToken(request);
return ((GetDelegationTokenResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
throw new ServiceException(e);
}
}
}

View File

@ -189,10 +189,26 @@ public class YarnConfiguration extends Configuration {
/** The class to use as the resource scheduler.*/
public static final String RM_SCHEDULER =
RM_PREFIX + "scheduler.class";
//Delegation token related keys
public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY =
RM_PREFIX + "delegation.key.update-interval";
public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
24*60*60*1000; // 1 day
public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
RM_PREFIX + "delegation.token.renew-interval";
public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
24*60*60*1000; // 1 day
public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY =
RM_PREFIX + "delegation.token.max-lifetime";
public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
7*24*60*60*1000; // 7 days
/** The class to use as the persistent store.*/
public static final String RM_STORE = RM_PREFIX + "store.class";
/** The address of the zookeeper instance to use with ZK store.*/
public static final String RM_ZK_STORE_ADDRESS =
RM_PREFIX + "zookeeper-store.address";

View File

@ -23,7 +23,9 @@ import java.lang.annotation.Annotation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.ClientRMProtocol;
@ -56,7 +58,22 @@ public class ClientRMSecurityInfo extends SecurityInfo {
@Override
public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
return null;
}
if (!protocol
.equals(ClientRMProtocol.ClientRMProtocolService.BlockingInterface.class)) {
return null;
}
return new TokenInfo() {
@Override
public Class<? extends Annotation> annotationType() {
return null;
}
@Override
public Class<? extends TokenSelector<? extends TokenIdentifier>>
value() {
return RMTokenSelector.class;
}
};
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.security.client;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
/**
* Delegation Token Identifier that identifies the delegation tokens from the
* Resource Manager.
*/
@Public
@Evolving
public class RMDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
public static final Text KIND_NAME = new Text("RM_DELEGATION_TOKEN");
public RMDelegationTokenIdentifier() {
}
/**
* Create a new delegation token identifier
* @param owner the effective username of the token owner
* @param renewer the username of the renewer
* @param realUser the real username of the token owner
*/
public RMDelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
super(owner, renewer, realUser);
}
@Override
public Text getKind() {
return KIND_NAME;
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.security.client;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector;
public class RMTokenSelector implements
TokenSelector<RMDelegationTokenIdentifier> {
private static final Log LOG = LogFactory
.getLog(RMTokenSelector.class);
@SuppressWarnings("unchecked")
public Token<RMDelegationTokenIdentifier> selectToken(Text service,
Collection<Token<? extends TokenIdentifier>> tokens) {
if (service == null) {
return null;
}
LOG.debug("Looking for a token with service " + service.toString());
for (Token<? extends TokenIdentifier> token : tokens) {
LOG.debug("Token kind is " + token.getKind().toString()
+ " and the token's service name is " + token.getService());
if (RMDelegationTokenIdentifier.KIND_NAME.equals(token.getKind())
&& service.equals(token.getService())) {
return (Token<RMDelegationTokenIdentifier>) token;
}
}
return null;
}
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
/**
* Builder utilities to construct various objects.
@ -221,6 +223,18 @@ public class BuilderUtils {
return container;
}
public static DelegationToken newDelegationToken(
byte[] identifier, String kind, byte[] password,
String service) {
DelegationToken delegationToken = recordFactory.newRecordInstance(
DelegationToken.class);
delegationToken.setIdentifier(ByteBuffer.wrap(identifier));
delegationToken.setKind(kind);
delegationToken.setPassword(ByteBuffer.wrap(password));
delegationToken.setService(service);
return delegationToken;
}
public static ContainerToken newContainerToken(NodeId nodeId,
ByteBuffer password, ContainerTokenIdentifier tokenIdentifier) {
ContainerToken containerToken = recordFactory

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
/**
* A ResourceManager specific delegation token secret manager.
* The secret manager is responsible for generating and accepting the password
* for each token.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class RMDelegationTokenSecretManager
extends AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> {
/**
* Create a secret manager
* @param delegationKeyUpdateInterval the number of seconds for rolling new
* secret keys.
* @param delegationTokenMaxLifetime the maximum lifetime of the delegation
* tokens
* @param delegationTokenRenewInterval how often the tokens must be renewed
* @param delegationTokenRemoverScanInterval how often the tokens are scanned
* for expired tokens
*/
public RMDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime,
long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
}
@Override
public RMDelegationTokenIdentifier createIdentifier() {
return new RMDelegationTokenIdentifier();
}
}

View File

@ -1,20 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
@ -28,12 +28,15 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
@ -43,6 +46,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
@ -53,8 +58,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.NodeReport;
@ -67,6 +72,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -78,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.BuilderUtils;
/**
@ -97,18 +105,22 @@ public class ClientRMService extends AbstractService implements
private String clientServiceBindAddress;
private Server server;
private RMDelegationTokenSecretManager rmDTSecretManager;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
InetSocketAddress clientBindAddress;
private final ApplicationACLsManager applicationsACLsManager;
public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager) {
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
RMDelegationTokenSecretManager rmDTSecretManager) {
super(ClientRMService.class.getName());
this.scheduler = scheduler;
this.rmContext = rmContext;
this.rmAppManager = rmAppManager;
this.applicationsACLsManager = applicationACLsManager;
this.rmDTSecretManager = rmDTSecretManager;
}
@Override
@ -118,21 +130,19 @@ public class ClientRMService extends AbstractService implements
YarnConfiguration.DEFAULT_RM_ADDRESS);
clientBindAddress =
NetUtils.createSocketAddr(clientServiceBindAddress,
YarnConfiguration.DEFAULT_RM_PORT,
YarnConfiguration.RM_ADDRESS);
YarnConfiguration.DEFAULT_RM_PORT,
YarnConfiguration.RM_ADDRESS);
super.init(conf);
}
@Override
public void start() {
// All the clients to appsManager are supposed to be authenticated via
// Kerberos if security is enabled, so no secretManager.
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
this.server =
rpc.getServer(ClientRMProtocol.class, this,
clientBindAddress,
conf, null,
conf, this.rmDTSecretManager,
conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT));
@ -423,6 +433,49 @@ public class ClientRMService extends AbstractService implements
return response;
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnRemoteException {
try {
// Verify that the connection is kerberos authenticated
AuthenticationMethod authMethod = UserGroupInformation
.getRealAuthenticationMethod(UserGroupInformation.getCurrentUser());
if (UserGroupInformation.isSecurityEnabled()
&& (authMethod != AuthenticationMethod.KERBEROS)) {
throw new IOException(
"Delegation Token can be issued only with kerberos authentication");
}
GetDelegationTokenResponse response =
recordFactory.newRecordInstance(GetDelegationTokenResponse.class);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Text owner = new Text(ugi.getUserName());
Text realUser = null;
if (ugi.getRealUser() != null) {
realUser = new Text(ugi.getRealUser().getUserName());
}
RMDelegationTokenIdentifier tokenIdentifier =
new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()),
realUser);
Token<RMDelegationTokenIdentifier> realRMDTtoken =
new Token<RMDelegationTokenIdentifier>(tokenIdentifier,
this.rmDTSecretManager);
response.setRMDelegationToken(
BuilderUtils.newDelegationToken(
realRMDTtoken.getIdentifier(),
realRMDTtoken.getKind().toString(),
realRMDTtoken.getPassword(),
clientBindAddress.getAddress().getHostAddress() + ":"
+ clientBindAddress.getPort()
));
return response;
} catch(IOException io) {
throw RPCUtil.getRemoteException(io);
}
}
void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);

View File

@ -1,20 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
@ -41,12 +41,13 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -66,16 +67,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.WebApps.Builder;
import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
/**
* The ResourceManager is the main class that is a set of components.
@ -107,7 +108,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
private EventHandler<SchedulerEvent> schedulerDispatcher;
protected RMAppManager rmAppManager;
protected ApplicationACLsManager applicationACLsManager;
protected RMDelegationTokenSecretManager rmDTSecretManager;
private WebApp webApp;
private RMContext rmContext;
private final Store store;
@ -193,7 +194,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
// Register event handler for RMAppManagerEvents
this.rmDispatcher.register(RMAppManagerEventType.class,
this.rmAppManager);
this.rmDTSecretManager = createRMDelegationTokenSecretManager();
clientRM = createClientRMService();
addService(clientRM);
@ -435,7 +436,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
startWepApp();
DefaultMetricsSystem.initialize("ResourceManager");
try {
rmDTSecretManager.startThreads();
} catch(IOException ie) {
throw new YarnException("Failed to start secret manager threads", ie);
}
super.start();
/*synchronized(shutdown) {
@ -459,6 +465,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
if (webApp != null) {
webApp.stop();
}
rmDTSecretManager.stopThreads();
/*synchronized(shutdown) {
shutdown.set(true);
@ -475,9 +482,25 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.nmLivelinessMonitor, this.containerTokenSecretManager);
}
protected RMDelegationTokenSecretManager
createRMDelegationTokenSecretManager() {
long secretKeyInterval =
conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY,
YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
long tokenMaxLifetime =
conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY,
YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
long tokenRenewInterval =
conf.getLong(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
return new RMDelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime, tokenRenewInterval, 3600000);
}
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
this.applicationACLsManager);
this.applicationACLsManager, this.rmDTSecretManager);
}
protected ApplicationMasterService createApplicationMasterService() {

View File

@ -178,7 +178,7 @@ public class MockRM extends ResourceManager {
@Override
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), getResourceScheduler(),
rmAppManager, applicationACLsManager) {
rmAppManager, applicationACLsManager, null) {
@Override
public void start() {
// override to not start rpc handler

View File

@ -89,7 +89,7 @@ public class TestApplicationACLs {
resourceManager = new MockRM(conf) {
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), this.scheduler,
this.rmAppManager, this.applicationACLsManager);
this.rmAppManager, this.applicationACLsManager, null);
};
};
new Thread() {