YARN-50. Implement renewal / cancellation of Delegation Tokens(Siddharth Seth via tgraves)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1429085 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
78ab699fe9
commit
dd8fc7e499
|
@ -270,6 +270,9 @@ Release 0.23.6 - UNRELEASED
|
||||||
YARN-293. Node Manager leaks LocalizerRunner object for every Container
|
YARN-293. Node Manager leaks LocalizerRunner object for every Container
|
||||||
(Robert Joseph Evans via jlowe)
|
(Robert Joseph Evans via jlowe)
|
||||||
|
|
||||||
|
YARN-50. Implement renewal / cancellation of Delegation Tokens
|
||||||
|
(Siddharth Seth via tgraves)
|
||||||
|
|
||||||
Release 0.23.5 - UNRELEASED
|
Release 0.23.5 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -18,13 +18,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.api;
|
package org.apache.hadoop.yarn.api;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
|
@ -33,19 +31,25 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
|
||||||
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
|
@ -265,4 +269,26 @@ public interface ClientRMProtocol {
|
||||||
public GetDelegationTokenResponse getDelegationToken(
|
public GetDelegationTokenResponse getDelegationToken(
|
||||||
GetDelegationTokenRequest request)
|
GetDelegationTokenRequest request)
|
||||||
throws YarnRemoteException;
|
throws YarnRemoteException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Renew an existing delegation token.
|
||||||
|
*
|
||||||
|
* @param request the delegation token to be renewed.
|
||||||
|
* @return the new expiry time for the delegation token.
|
||||||
|
* @throws YarnRemoteException
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
public RenewDelegationTokenResponse renewDelegationToken(
|
||||||
|
RenewDelegationTokenRequest request) throws YarnRemoteException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancel an existing delegation token.
|
||||||
|
*
|
||||||
|
* @param request the delegation token to be cancelled.
|
||||||
|
* @return an empty response.
|
||||||
|
* @throws YarnRemoteException
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
public CancelDelegationTokenResponse cancelDelegationToken(
|
||||||
|
CancelDelegationTokenRequest request) throws YarnRemoteException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.protocolrecords;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
|
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The request issued by the client to the {@code ResourceManager} to cancel a
|
||||||
|
* delegation token.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Evolving
|
||||||
|
public interface CancelDelegationTokenRequest {
|
||||||
|
DelegationToken getDelegationToken();
|
||||||
|
void setDelegationToken(DelegationToken dToken);
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.protocolrecords;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The response from the {@code ResourceManager} to a cancelDelegationToken
|
||||||
|
* request.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Evolving
|
||||||
|
public interface CancelDelegationTokenResponse {
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.protocolrecords;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
|
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The request issued by the client to renew a delegation token from
|
||||||
|
* the {@code ResourceManager}.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Evolving
|
||||||
|
public interface RenewDelegationTokenRequest {
|
||||||
|
DelegationToken getDelegationToken();
|
||||||
|
void setDelegationToken(DelegationToken dToken);
|
||||||
|
}
|
|
@ -0,0 +1,32 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.protocolrecords;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The response to a renewDelegationToken call to the {@code ResourceManager}.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Evolving
|
||||||
|
public interface RenewDelegationTokenResponse {
|
||||||
|
long getNextExpirationTime();
|
||||||
|
void setNextExpirationTime(long expTime);
|
||||||
|
}
|
|
@ -0,0 +1,106 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.DelegationTokenPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.DelegationTokenProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenRequestProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenRequestProtoOrBuilder;
|
||||||
|
|
||||||
|
public class CancelDelegationTokenRequestPBImpl extends
|
||||||
|
ProtoBase<CancelDelegationTokenRequestProto> implements
|
||||||
|
CancelDelegationTokenRequest {
|
||||||
|
|
||||||
|
CancelDelegationTokenRequestProto proto = CancelDelegationTokenRequestProto
|
||||||
|
.getDefaultInstance();
|
||||||
|
CancelDelegationTokenRequestProto.Builder builder = null;
|
||||||
|
boolean viaProto = false;
|
||||||
|
|
||||||
|
public CancelDelegationTokenRequestPBImpl() {
|
||||||
|
builder = CancelDelegationTokenRequestProto.newBuilder();
|
||||||
|
}
|
||||||
|
|
||||||
|
public CancelDelegationTokenRequestPBImpl(
|
||||||
|
CancelDelegationTokenRequestProto proto) {
|
||||||
|
this.proto = proto;
|
||||||
|
viaProto = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
DelegationToken token;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DelegationToken getDelegationToken() {
|
||||||
|
CancelDelegationTokenRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (this.token != null) {
|
||||||
|
return this.token;
|
||||||
|
}
|
||||||
|
if (!p.hasDelegationToken()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
this.token = convertFromProtoFormat(p.getDelegationToken());
|
||||||
|
return this.token;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setDelegationToken(DelegationToken token) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (token == null)
|
||||||
|
builder.clearDelegationToken();
|
||||||
|
this.token = token;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CancelDelegationTokenRequestProto getProto() {
|
||||||
|
mergeLocalToProto();
|
||||||
|
proto = viaProto ? proto : builder.build();
|
||||||
|
viaProto = true;
|
||||||
|
return proto;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mergeLocalToBuilder() {
|
||||||
|
if (token != null) {
|
||||||
|
builder.setDelegationToken(convertToProtoFormat(this.token));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mergeLocalToProto() {
|
||||||
|
if (viaProto)
|
||||||
|
maybeInitBuilder();
|
||||||
|
mergeLocalToBuilder();
|
||||||
|
proto = builder.build();
|
||||||
|
viaProto = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void maybeInitBuilder() {
|
||||||
|
if (viaProto || builder == null) {
|
||||||
|
builder = CancelDelegationTokenRequestProto.newBuilder(proto);
|
||||||
|
}
|
||||||
|
viaProto = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private DelegationTokenPBImpl convertFromProtoFormat(DelegationTokenProto p) {
|
||||||
|
return new DelegationTokenPBImpl(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
private DelegationTokenProto convertToProtoFormat(DelegationToken t) {
|
||||||
|
return ((DelegationTokenPBImpl) t).getProto();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,44 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenResponseProto;
|
||||||
|
|
||||||
|
public class CancelDelegationTokenResponsePBImpl extends
|
||||||
|
ProtoBase<CancelDelegationTokenResponseProto> implements
|
||||||
|
CancelDelegationTokenResponse {
|
||||||
|
|
||||||
|
CancelDelegationTokenResponseProto proto = CancelDelegationTokenResponseProto
|
||||||
|
.getDefaultInstance();
|
||||||
|
|
||||||
|
public CancelDelegationTokenResponsePBImpl() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public CancelDelegationTokenResponsePBImpl(
|
||||||
|
CancelDelegationTokenResponseProto proto) {
|
||||||
|
this.proto = proto;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CancelDelegationTokenResponseProto getProto() {
|
||||||
|
return proto;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,107 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.DelegationTokenPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.DelegationTokenProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenRequestProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenRequestProtoOrBuilder;
|
||||||
|
|
||||||
|
public class RenewDelegationTokenRequestPBImpl extends
|
||||||
|
ProtoBase<RenewDelegationTokenRequestProto> implements
|
||||||
|
RenewDelegationTokenRequest {
|
||||||
|
RenewDelegationTokenRequestProto proto =
|
||||||
|
RenewDelegationTokenRequestProto.getDefaultInstance();
|
||||||
|
RenewDelegationTokenRequestProto.Builder builder = null;
|
||||||
|
boolean viaProto = false;
|
||||||
|
|
||||||
|
public RenewDelegationTokenRequestPBImpl() {
|
||||||
|
builder = RenewDelegationTokenRequestProto.newBuilder();
|
||||||
|
}
|
||||||
|
|
||||||
|
public RenewDelegationTokenRequestPBImpl (
|
||||||
|
RenewDelegationTokenRequestProto proto) {
|
||||||
|
this.proto = proto;
|
||||||
|
this.viaProto = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
DelegationToken token;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DelegationToken getDelegationToken() {
|
||||||
|
RenewDelegationTokenRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (this.token != null) {
|
||||||
|
return this.token;
|
||||||
|
}
|
||||||
|
if (!p.hasDelegationToken()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
this.token = convertFromProtoFormat(p.getDelegationToken());
|
||||||
|
return this.token;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setDelegationToken(DelegationToken token) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (token == null)
|
||||||
|
builder.clearDelegationToken();
|
||||||
|
this.token = token;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RenewDelegationTokenRequestProto getProto() {
|
||||||
|
mergeLocalToProto();
|
||||||
|
proto = viaProto ? proto : builder.build();
|
||||||
|
viaProto = true;
|
||||||
|
return proto;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void mergeLocalToBuilder() {
|
||||||
|
if (token != null) {
|
||||||
|
builder.setDelegationToken(convertToProtoFormat(this.token));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mergeLocalToProto() {
|
||||||
|
if (viaProto)
|
||||||
|
maybeInitBuilder();
|
||||||
|
mergeLocalToBuilder();
|
||||||
|
proto = builder.build();
|
||||||
|
viaProto = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void maybeInitBuilder() {
|
||||||
|
if (viaProto || builder == null) {
|
||||||
|
builder = RenewDelegationTokenRequestProto.newBuilder(proto);
|
||||||
|
}
|
||||||
|
viaProto = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private DelegationTokenPBImpl convertFromProtoFormat(DelegationTokenProto p) {
|
||||||
|
return new DelegationTokenPBImpl(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
private DelegationTokenProto convertToProtoFormat(DelegationToken t) {
|
||||||
|
return ((DelegationTokenPBImpl)t).getProto();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,69 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenResponseProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenResponseProtoOrBuilder;
|
||||||
|
|
||||||
|
public class RenewDelegationTokenResponsePBImpl extends
|
||||||
|
ProtoBase<RenewDelegationTokenResponseProto> implements
|
||||||
|
RenewDelegationTokenResponse {
|
||||||
|
|
||||||
|
RenewDelegationTokenResponseProto proto =
|
||||||
|
RenewDelegationTokenResponseProto.getDefaultInstance();
|
||||||
|
RenewDelegationTokenResponseProto.Builder builder = null;
|
||||||
|
boolean viaProto = false;
|
||||||
|
|
||||||
|
public RenewDelegationTokenResponsePBImpl() {
|
||||||
|
this.builder = RenewDelegationTokenResponseProto.newBuilder();
|
||||||
|
}
|
||||||
|
|
||||||
|
public RenewDelegationTokenResponsePBImpl (
|
||||||
|
RenewDelegationTokenResponseProto proto) {
|
||||||
|
this.proto = proto;
|
||||||
|
this.viaProto = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RenewDelegationTokenResponseProto getProto() {
|
||||||
|
proto = viaProto ? proto : builder.build();
|
||||||
|
viaProto = true;
|
||||||
|
return proto;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void maybeInitBuilder() {
|
||||||
|
if (viaProto || builder == null) {
|
||||||
|
builder = RenewDelegationTokenResponseProto.newBuilder(proto);
|
||||||
|
}
|
||||||
|
viaProto = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getNextExpirationTime() {
|
||||||
|
RenewDelegationTokenResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
return p.getNextExpiryTs();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setNextExpirationTime(long expTime) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.setNextExpiryTs(expTime);
|
||||||
|
}
|
||||||
|
}
|
|
@ -34,5 +34,7 @@ service ClientRMProtocolService {
|
||||||
rpc getQueueInfo (GetQueueInfoRequestProto) returns (GetQueueInfoResponseProto);
|
rpc getQueueInfo (GetQueueInfoRequestProto) returns (GetQueueInfoResponseProto);
|
||||||
rpc getQueueUserAcls (GetQueueUserAclsInfoRequestProto) returns (GetQueueUserAclsInfoResponseProto);
|
rpc getQueueUserAcls (GetQueueUserAclsInfoRequestProto) returns (GetQueueUserAclsInfoResponseProto);
|
||||||
rpc getDelegationToken(GetDelegationTokenRequestProto) returns (GetDelegationTokenResponseProto);
|
rpc getDelegationToken(GetDelegationTokenRequestProto) returns (GetDelegationTokenResponseProto);
|
||||||
|
rpc renewDelegationToken(RenewDelegationTokenRequestProto) returns (RenewDelegationTokenResponseProto);
|
||||||
|
rpc cancelDelegationToken(CancelDelegationTokenRequestProto) returns (CancelDelegationTokenResponseProto);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -147,6 +147,22 @@ message GetDelegationTokenResponseProto {
|
||||||
optional DelegationTokenProto application_token = 1;
|
optional DelegationTokenProto application_token = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message RenewDelegationTokenRequestProto {
|
||||||
|
required DelegationTokenProto delegation_token = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message RenewDelegationTokenResponseProto {
|
||||||
|
required int64 next_expiry_ts = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message CancelDelegationTokenRequestProto {
|
||||||
|
required DelegationTokenProto delegation_token = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message CancelDelegationTokenResponseProto {
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
//////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////
|
||||||
/////// client_NM_Protocol ///////////////////////////
|
/////// client_NM_Protocol ///////////////////////////
|
||||||
//////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////
|
||||||
|
|
|
@ -25,11 +25,13 @@ import java.util.List;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
|
@ -45,6 +47,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
@ -72,7 +76,12 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
|
||||||
private static final String ROOT = "root";
|
private static final String ROOT = "root";
|
||||||
|
|
||||||
public YarnClientImpl() {
|
public YarnClientImpl() {
|
||||||
|
this(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public YarnClientImpl(InetSocketAddress rmAddress) {
|
||||||
super(YarnClientImpl.class.getName());
|
super(YarnClientImpl.class.getName());
|
||||||
|
this.rmAddress = rmAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static InetSocketAddress getRmAddress(Configuration conf) {
|
private static InetSocketAddress getRmAddress(Configuration conf) {
|
||||||
|
@ -82,7 +91,9 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void init(Configuration conf) {
|
public synchronized void init(Configuration conf) {
|
||||||
this.rmAddress = getRmAddress(conf);
|
if (this.rmAddress == null) {
|
||||||
|
this.rmAddress = getRmAddress(conf);
|
||||||
|
}
|
||||||
super.init(conf);
|
super.init(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,16 +101,19 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
|
||||||
public synchronized void start() {
|
public synchronized void start() {
|
||||||
YarnRPC rpc = YarnRPC.create(getConfig());
|
YarnRPC rpc = YarnRPC.create(getConfig());
|
||||||
|
|
||||||
this.rmClient =
|
this.rmClient = (ClientRMProtocol) rpc.getProxy(
|
||||||
(ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress,
|
ClientRMProtocol.class, rmAddress, getConfig());
|
||||||
getConfig());
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Connecting to ResourceManager at " + rmAddress);
|
LOG.debug("Connecting to ResourceManager at " + rmAddress);
|
||||||
|
}
|
||||||
super.start();
|
super.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void stop() {
|
public synchronized void stop() {
|
||||||
RPC.stopProxy(this.rmClient);
|
if (this.rmClient != null) {
|
||||||
|
RPC.stopProxy(this.rmClient);
|
||||||
|
}
|
||||||
super.stop();
|
super.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -184,6 +198,31 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
|
||||||
return response.getRMDelegationToken();
|
return response.getRMDelegationToken();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel
|
||||||
|
// are part of ClientRMProtocol.
|
||||||
|
@Private
|
||||||
|
public long renewRMDelegationToken(DelegationToken rmToken)
|
||||||
|
throws YarnRemoteException {
|
||||||
|
RenewDelegationTokenRequest request = Records
|
||||||
|
.newRecord(RenewDelegationTokenRequest.class);
|
||||||
|
request.setDelegationToken(rmToken);
|
||||||
|
RenewDelegationTokenResponse response = rmClient
|
||||||
|
.renewDelegationToken(request);
|
||||||
|
return response.getNextExpirationTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel
|
||||||
|
// are part of ClietnRMProtocol
|
||||||
|
@Private
|
||||||
|
public void cancelRMDelegationToken(DelegationToken rmToken)
|
||||||
|
throws YarnRemoteException {
|
||||||
|
CancelDelegationTokenRequest request = Records
|
||||||
|
.newRecord(CancelDelegationTokenRequest.class);
|
||||||
|
request.setDelegationToken(rmToken);
|
||||||
|
rmClient.cancelDelegationToken(request);
|
||||||
|
}
|
||||||
|
|
||||||
private GetQueueInfoRequest
|
private GetQueueInfoRequest
|
||||||
getQueueInfoRequest(String queueName, boolean includeApplications,
|
getQueueInfoRequest(String queueName, boolean includeApplications,
|
||||||
boolean includeChildQueues, boolean recursive) {
|
boolean includeChildQueues, boolean recursive) {
|
||||||
|
|
|
@ -0,0 +1,83 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.security;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenRenewer;
|
||||||
|
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||||
|
import org.apache.hadoop.yarn.client.YarnClientImpl;
|
||||||
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
|
|
||||||
|
public class RMDelegationTokenRenewer extends TokenRenewer {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean handleKind(Text kind) {
|
||||||
|
return RMDelegationTokenIdentifier.KIND_NAME.equals(kind);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isManaged(Token<?> token) throws IOException {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long renew(Token<?> token, Configuration conf) throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
YarnClientImpl yarnClient = getYarnClient(conf,
|
||||||
|
SecurityUtil.getTokenServiceAddr(token));
|
||||||
|
try {
|
||||||
|
DelegationToken dToken = BuilderUtils.newDelegationToken(
|
||||||
|
token.getIdentifier(), token.getKind().toString(),
|
||||||
|
token.getPassword(), token.getService().toString());
|
||||||
|
return yarnClient.renewRMDelegationToken(dToken);
|
||||||
|
} finally {
|
||||||
|
yarnClient.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cancel(Token<?> token, Configuration conf) throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
YarnClientImpl yarnClient = getYarnClient(conf,
|
||||||
|
SecurityUtil.getTokenServiceAddr(token));
|
||||||
|
try {
|
||||||
|
DelegationToken dToken = BuilderUtils.newDelegationToken(
|
||||||
|
token.getIdentifier(), token.getKind().toString(),
|
||||||
|
token.getPassword(), token.getService().toString());
|
||||||
|
yarnClient.cancelRMDelegationToken(dToken);
|
||||||
|
return;
|
||||||
|
} finally {
|
||||||
|
yarnClient.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private YarnClientImpl getYarnClient(Configuration conf,
|
||||||
|
InetSocketAddress rmAddress) {
|
||||||
|
YarnClientImpl yarnClient = new YarnClientImpl(rmAddress);
|
||||||
|
yarnClient.init(conf);
|
||||||
|
yarnClient.start();
|
||||||
|
return yarnClient;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1 @@
|
||||||
|
org.apache.hadoop.yarn.security.RMDelegationTokenRenewer;
|
|
@ -27,6 +27,8 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||||
import org.apache.hadoop.yarn.api.ClientRMProtocolPB;
|
import org.apache.hadoop.yarn.api.ClientRMProtocolPB;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
|
@ -45,8 +47,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl;
|
||||||
|
@ -65,10 +71,13 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoRe
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
|
import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsRequestProto;
|
||||||
|
@ -78,6 +87,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationRequestPr
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
|
||||||
|
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
@ -233,4 +243,31 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol,
|
||||||
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RenewDelegationTokenResponse renewDelegationToken(
|
||||||
|
RenewDelegationTokenRequest request) throws YarnRemoteException {
|
||||||
|
RenewDelegationTokenRequestProto requestProto =
|
||||||
|
((RenewDelegationTokenRequestPBImpl) request).getProto();
|
||||||
|
try {
|
||||||
|
return new RenewDelegationTokenResponsePBImpl(proxy.renewDelegationToken(
|
||||||
|
null, requestProto));
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CancelDelegationTokenResponse cancelDelegationToken(
|
||||||
|
CancelDelegationTokenRequest request) throws YarnRemoteException {
|
||||||
|
CancelDelegationTokenRequestProto requestProto =
|
||||||
|
((CancelDelegationTokenRequestPBImpl) request).getProto();
|
||||||
|
try {
|
||||||
|
return new CancelDelegationTokenResponsePBImpl(
|
||||||
|
proxy.cancelDelegationToken(null, requestProto));
|
||||||
|
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.impl.pb.service;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||||
import org.apache.hadoop.yarn.api.ClientRMProtocolPB;
|
import org.apache.hadoop.yarn.api.ClientRMProtocolPB;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
||||||
|
@ -29,7 +30,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl;
|
||||||
|
@ -48,9 +52,13 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoRe
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenRequestProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.CancelDelegationTokenResponseProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsResponseProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto;
|
||||||
|
@ -69,6 +77,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoReques
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoResponseProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenRequestProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RenewDelegationTokenResponseProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
|
||||||
|
|
||||||
|
@ -212,4 +222,32 @@ public class ClientRMProtocolPBServiceImpl implements ClientRMProtocolPB {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RenewDelegationTokenResponseProto renewDelegationToken(
|
||||||
|
RpcController controller, RenewDelegationTokenRequestProto proto)
|
||||||
|
throws ServiceException {
|
||||||
|
RenewDelegationTokenRequestPBImpl request =
|
||||||
|
new RenewDelegationTokenRequestPBImpl(proto);
|
||||||
|
try {
|
||||||
|
RenewDelegationTokenResponse response = real.renewDelegationToken(request);
|
||||||
|
return ((RenewDelegationTokenResponsePBImpl)response).getProto();
|
||||||
|
} catch (YarnRemoteException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CancelDelegationTokenResponseProto cancelDelegationToken(
|
||||||
|
RpcController controller, CancelDelegationTokenRequestProto proto)
|
||||||
|
throws ServiceException {
|
||||||
|
CancelDelegationTokenRequestPBImpl request =
|
||||||
|
new CancelDelegationTokenRequestPBImpl(proto);
|
||||||
|
try {
|
||||||
|
CancelDelegationTokenResponse response = real.cancelDelegationToken(request);
|
||||||
|
return ((CancelDelegationTokenResponsePBImpl)response).getProto();
|
||||||
|
} catch (YarnRemoteException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
|
||||||
import java.security.AccessControlException;
|
import java.security.AccessControlException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
@ -39,6 +40,8 @@ import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.ExitUtil;
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
|
@ -57,12 +60,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
@ -87,6 +93,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicy
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.service.AbstractService;
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -105,7 +112,7 @@ public class ClientRMService extends AbstractService implements
|
||||||
private final RMAppManager rmAppManager;
|
private final RMAppManager rmAppManager;
|
||||||
|
|
||||||
private Server server;
|
private Server server;
|
||||||
private RMDelegationTokenSecretManager rmDTSecretManager;
|
protected RMDelegationTokenSecretManager rmDTSecretManager;
|
||||||
|
|
||||||
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||||
InetSocketAddress clientBindAddress;
|
InetSocketAddress clientBindAddress;
|
||||||
|
@ -122,16 +129,13 @@ public class ClientRMService extends AbstractService implements
|
||||||
this.applicationsACLsManager = applicationACLsManager;
|
this.applicationsACLsManager = applicationACLsManager;
|
||||||
this.rmDTSecretManager = rmDTSecretManager;
|
this.rmDTSecretManager = rmDTSecretManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Configuration conf) {
|
public void init(Configuration conf) {
|
||||||
clientBindAddress = conf.getSocketAddr(
|
clientBindAddress = getBindAddress(conf);
|
||||||
YarnConfiguration.RM_ADDRESS,
|
|
||||||
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
|
||||||
YarnConfiguration.DEFAULT_RM_PORT);
|
|
||||||
super.init(conf);
|
super.init(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
Configuration conf = getConfig();
|
Configuration conf = getConfig();
|
||||||
|
@ -156,6 +160,20 @@ public class ClientRMService extends AbstractService implements
|
||||||
super.start();
|
super.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
if (this.server != null) {
|
||||||
|
this.server.stop();
|
||||||
|
}
|
||||||
|
super.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
InetSocketAddress getBindAddress(Configuration conf) {
|
||||||
|
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_PORT);
|
||||||
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public InetSocketAddress getBindAddress() {
|
public InetSocketAddress getBindAddress() {
|
||||||
return clientBindAddress;
|
return clientBindAddress;
|
||||||
|
@ -455,10 +473,7 @@ public class ClientRMService extends AbstractService implements
|
||||||
try {
|
try {
|
||||||
|
|
||||||
// Verify that the connection is kerberos authenticated
|
// Verify that the connection is kerberos authenticated
|
||||||
AuthenticationMethod authMethod = UserGroupInformation
|
if (!isAllowedDelegationTokenOp()) {
|
||||||
.getRealAuthenticationMethod(UserGroupInformation.getCurrentUser());
|
|
||||||
if (UserGroupInformation.isSecurityEnabled()
|
|
||||||
&& (authMethod != AuthenticationMethod.KERBEROS)) {
|
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
"Delegation Token can be issued only with kerberos authentication");
|
"Delegation Token can be issued only with kerberos authentication");
|
||||||
}
|
}
|
||||||
|
@ -490,17 +505,66 @@ public class ClientRMService extends AbstractService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RenewDelegationTokenResponse renewDelegationToken(
|
||||||
|
RenewDelegationTokenRequest request) throws YarnRemoteException {
|
||||||
|
try {
|
||||||
|
if (!isAllowedDelegationTokenOp()) {
|
||||||
|
throw new IOException(
|
||||||
|
"Delegation Token can be renewed only with kerberos authentication");
|
||||||
|
}
|
||||||
|
|
||||||
|
DelegationToken protoToken = request.getDelegationToken();
|
||||||
|
Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>(
|
||||||
|
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
|
||||||
|
new Text(protoToken.getKind()), new Text(protoToken.getService()));
|
||||||
|
|
||||||
|
String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
|
long nextExpTime = rmDTSecretManager.renewToken(token, user);
|
||||||
|
RenewDelegationTokenResponse renewResponse = Records
|
||||||
|
.newRecord(RenewDelegationTokenResponse.class);
|
||||||
|
renewResponse.setNextExpirationTime(nextExpTime);
|
||||||
|
return renewResponse;
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw RPCUtil.getRemoteException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CancelDelegationTokenResponse cancelDelegationToken(
|
||||||
|
CancelDelegationTokenRequest request) throws YarnRemoteException {
|
||||||
|
try {
|
||||||
|
if (!isAllowedDelegationTokenOp()) {
|
||||||
|
throw new IOException(
|
||||||
|
"Delegation Token can be cancelled only with kerberos authentication");
|
||||||
|
}
|
||||||
|
DelegationToken protoToken = request.getDelegationToken();
|
||||||
|
Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>(
|
||||||
|
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
|
||||||
|
new Text(protoToken.getKind()), new Text(protoToken.getService()));
|
||||||
|
|
||||||
|
String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
|
rmDTSecretManager.cancelToken(token, user);
|
||||||
|
return Records.newRecord(CancelDelegationTokenResponse.class);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw RPCUtil.getRemoteException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void refreshServiceAcls(Configuration configuration,
|
void refreshServiceAcls(Configuration configuration,
|
||||||
PolicyProvider policyProvider) {
|
PolicyProvider policyProvider) {
|
||||||
this.server.refreshServiceAcl(configuration, policyProvider);
|
this.server.refreshServiceAcl(configuration, policyProvider);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private boolean isAllowedDelegationTokenOp() throws IOException {
|
||||||
public void stop() {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
if (this.server != null) {
|
return EnumSet.of(AuthenticationMethod.KERBEROS,
|
||||||
this.server.stop();
|
AuthenticationMethod.KERBEROS_SSL,
|
||||||
|
AuthenticationMethod.CERTIFICATE)
|
||||||
|
.contains(UserGroupInformation.getCurrentUser()
|
||||||
|
.getRealAuthenticationMethod());
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
super.stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,10 +47,10 @@ import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
|
@ -66,8 +66,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
|
|
|
@ -0,0 +1,320 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.security.PrivilegedAction;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
|
public class TestClientRMTokens {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestClientRMTokens.class);
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDelegationToken() throws IOException, InterruptedException {
|
||||||
|
|
||||||
|
final YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.set(YarnConfiguration.RM_PRINCIPAL, "testuser/localhost@apache.org");
|
||||||
|
|
||||||
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
|
||||||
|
ResourceScheduler scheduler = createMockScheduler(conf);
|
||||||
|
|
||||||
|
long initialInterval = 10000l;
|
||||||
|
long maxLifetime= 20000l;
|
||||||
|
long renewInterval = 10000l;
|
||||||
|
|
||||||
|
RMDelegationTokenSecretManager rmDtSecretManager = createRMDelegationTokenSecretManager(
|
||||||
|
initialInterval, maxLifetime, renewInterval);
|
||||||
|
rmDtSecretManager.startThreads();
|
||||||
|
LOG.info("Creating DelegationTokenSecretManager with initialInterval: "
|
||||||
|
+ initialInterval + ", maxLifetime: " + maxLifetime
|
||||||
|
+ ", renewInterval: " + renewInterval);
|
||||||
|
|
||||||
|
final ClientRMService clientRMService = new ClientRMServiceForTest(conf,
|
||||||
|
scheduler, rmDtSecretManager);
|
||||||
|
clientRMService.init(conf);
|
||||||
|
clientRMService.start();
|
||||||
|
|
||||||
|
ClientRMProtocol clientRMWithDT = null;
|
||||||
|
try {
|
||||||
|
|
||||||
|
// Create a user for the renewr and fake the authentication-method
|
||||||
|
UserGroupInformation loggedInUser = UserGroupInformation
|
||||||
|
.createRemoteUser("testrenewer@APACHE.ORG");
|
||||||
|
Assert.assertEquals("testrenewer", loggedInUser.getShortUserName());
|
||||||
|
// Default realm is APACHE.ORG
|
||||||
|
loggedInUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
|
||||||
|
|
||||||
|
|
||||||
|
DelegationToken token = getDelegationToken(loggedInUser, clientRMService,
|
||||||
|
loggedInUser.getShortUserName());
|
||||||
|
long tokenFetchTime = System.currentTimeMillis();
|
||||||
|
LOG.info("Got delegation token at: " + tokenFetchTime);
|
||||||
|
|
||||||
|
// Now try talking to RMService using the delegation token
|
||||||
|
clientRMWithDT = getClientRMProtocolWithDT(token,
|
||||||
|
clientRMService.getBindAddress(), "loginuser1", conf);
|
||||||
|
|
||||||
|
GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
|
||||||
|
|
||||||
|
try {
|
||||||
|
clientRMWithDT.getNewApplication(request);
|
||||||
|
} catch (UndeclaredThrowableException e) {
|
||||||
|
fail("Unexpected exception" + e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Renew after 50% of token age.
|
||||||
|
while(System.currentTimeMillis() < tokenFetchTime + initialInterval / 2) {
|
||||||
|
Thread.sleep(500l);
|
||||||
|
}
|
||||||
|
long nextExpTime = renewDelegationToken(loggedInUser, clientRMService, token);
|
||||||
|
long renewalTime = System.currentTimeMillis();
|
||||||
|
LOG.info("Renewed token at: " + renewalTime + ", NextExpiryTime: "
|
||||||
|
+ nextExpTime);
|
||||||
|
|
||||||
|
// Wait for first expiry, but before renewed expiry.
|
||||||
|
while (System.currentTimeMillis() > tokenFetchTime + initialInterval
|
||||||
|
&& System.currentTimeMillis() < nextExpTime) {
|
||||||
|
Thread.sleep(500l);
|
||||||
|
}
|
||||||
|
Thread.sleep(50l);
|
||||||
|
|
||||||
|
// Valid token because of renewal.
|
||||||
|
try {
|
||||||
|
clientRMWithDT.getNewApplication(request);
|
||||||
|
} catch (UndeclaredThrowableException e) {
|
||||||
|
fail("Unexpected exception" + e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for expiry.
|
||||||
|
while(System.currentTimeMillis() < renewalTime + renewInterval) {
|
||||||
|
Thread.sleep(500l);
|
||||||
|
}
|
||||||
|
Thread.sleep(50l);
|
||||||
|
LOG.info("At time: " + System.currentTimeMillis() + ", token should be invalid");
|
||||||
|
// Token should have expired.
|
||||||
|
try {
|
||||||
|
clientRMWithDT.getNewApplication(request);
|
||||||
|
fail("Should not have succeeded with an expired token");
|
||||||
|
} catch (UndeclaredThrowableException e) {
|
||||||
|
assertTrue(e.getCause().getMessage().contains("is expired"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test cancellation
|
||||||
|
// Stop the existing proxy, start another.
|
||||||
|
if (clientRMWithDT != null) {
|
||||||
|
RPC.stopProxy(clientRMWithDT);
|
||||||
|
clientRMWithDT = null;
|
||||||
|
}
|
||||||
|
token = getDelegationToken(loggedInUser, clientRMService,
|
||||||
|
loggedInUser.getShortUserName());
|
||||||
|
tokenFetchTime = System.currentTimeMillis();
|
||||||
|
LOG.info("Got delegation token at: " + tokenFetchTime);
|
||||||
|
|
||||||
|
// Now try talking to RMService using the delegation token
|
||||||
|
clientRMWithDT = getClientRMProtocolWithDT(token,
|
||||||
|
clientRMService.getBindAddress(), "loginuser2", conf);
|
||||||
|
|
||||||
|
request = Records.newRecord(GetNewApplicationRequest.class);
|
||||||
|
|
||||||
|
try {
|
||||||
|
clientRMWithDT.getNewApplication(request);
|
||||||
|
} catch (UndeclaredThrowableException e) {
|
||||||
|
fail("Unexpected exception" + e);
|
||||||
|
}
|
||||||
|
cancelDelegationToken(loggedInUser, clientRMService, token);
|
||||||
|
if (clientRMWithDT != null) {
|
||||||
|
RPC.stopProxy(clientRMWithDT);
|
||||||
|
clientRMWithDT = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creating a new connection.
|
||||||
|
clientRMWithDT = getClientRMProtocolWithDT(token,
|
||||||
|
clientRMService.getBindAddress(), "loginuser2", conf);
|
||||||
|
LOG.info("Cancelled delegation token at: " + System.currentTimeMillis());
|
||||||
|
// Verify cancellation worked.
|
||||||
|
try {
|
||||||
|
clientRMWithDT.getNewApplication(request);
|
||||||
|
fail("Should not have succeeded with a cancelled delegation token");
|
||||||
|
} catch (UndeclaredThrowableException e) {
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
rmDtSecretManager.stopThreads();
|
||||||
|
// TODO PRECOMMIT Close proxies.
|
||||||
|
if (clientRMWithDT != null) {
|
||||||
|
RPC.stopProxy(clientRMWithDT);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the delegation token directly as it is a little difficult to setup
|
||||||
|
// the kerberos based rpc.
|
||||||
|
private DelegationToken getDelegationToken(
|
||||||
|
final UserGroupInformation loggedInUser,
|
||||||
|
final ClientRMProtocol clientRMService, final String renewerString)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
DelegationToken token = loggedInUser
|
||||||
|
.doAs(new PrivilegedExceptionAction<DelegationToken>() {
|
||||||
|
@Override
|
||||||
|
public DelegationToken run() throws YarnRemoteException {
|
||||||
|
GetDelegationTokenRequest request = Records
|
||||||
|
.newRecord(GetDelegationTokenRequest.class);
|
||||||
|
request.setRenewer(renewerString);
|
||||||
|
return clientRMService.getDelegationToken(request)
|
||||||
|
.getRMDelegationToken();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
|
||||||
|
private long renewDelegationToken(final UserGroupInformation loggedInUser,
|
||||||
|
final ClientRMProtocol clientRMService, final DelegationToken dToken)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
long nextExpTime = loggedInUser.doAs(new PrivilegedExceptionAction<Long>() {
|
||||||
|
@Override
|
||||||
|
public Long run() throws YarnRemoteException {
|
||||||
|
RenewDelegationTokenRequest request = Records
|
||||||
|
.newRecord(RenewDelegationTokenRequest.class);
|
||||||
|
request.setDelegationToken(dToken);
|
||||||
|
return clientRMService.renewDelegationToken(request)
|
||||||
|
.getNextExpirationTime();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return nextExpTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cancelDelegationToken(final UserGroupInformation loggedInUser,
|
||||||
|
final ClientRMProtocol clientRMService, final DelegationToken dToken)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
loggedInUser.doAs(new PrivilegedExceptionAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws YarnRemoteException {
|
||||||
|
CancelDelegationTokenRequest request = Records
|
||||||
|
.newRecord(CancelDelegationTokenRequest.class);
|
||||||
|
request.setDelegationToken(dToken);
|
||||||
|
clientRMService.cancelDelegationToken(request);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private ClientRMProtocol getClientRMProtocolWithDT(DelegationToken token,
|
||||||
|
final InetSocketAddress rmAddress, String user, final Configuration conf) {
|
||||||
|
// Maybe consider converting to Hadoop token, serialize de-serialize etc
|
||||||
|
// before trying to renew the token.
|
||||||
|
|
||||||
|
UserGroupInformation ugi = UserGroupInformation
|
||||||
|
.createRemoteUser(user);
|
||||||
|
ugi.addToken(ProtoUtils.convertFromProtoFormat(token, rmAddress));
|
||||||
|
|
||||||
|
final YarnRPC rpc = YarnRPC.create(conf);
|
||||||
|
ClientRMProtocol clientRMWithDT = ugi
|
||||||
|
.doAs(new PrivilegedAction<ClientRMProtocol>() {
|
||||||
|
@Override
|
||||||
|
public ClientRMProtocol run() {
|
||||||
|
return (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
|
||||||
|
rmAddress, conf);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return clientRMWithDT;
|
||||||
|
}
|
||||||
|
|
||||||
|
class ClientRMServiceForTest extends ClientRMService {
|
||||||
|
|
||||||
|
public ClientRMServiceForTest(Configuration conf,
|
||||||
|
ResourceScheduler scheduler,
|
||||||
|
RMDelegationTokenSecretManager rmDTSecretManager) {
|
||||||
|
super(mock(RMContext.class), scheduler, mock(RMAppManager.class),
|
||||||
|
new ApplicationACLsManager(conf), rmDTSecretManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use a random port unless explicitly specified.
|
||||||
|
@Override
|
||||||
|
InetSocketAddress getBindAddress(Configuration conf) {
|
||||||
|
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_ADDRESS, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
if (rmDTSecretManager != null) {
|
||||||
|
rmDTSecretManager.stopThreads();
|
||||||
|
}
|
||||||
|
super.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ResourceScheduler createMockScheduler(Configuration conf) {
|
||||||
|
ResourceScheduler mockSched = mock(ResourceScheduler.class);
|
||||||
|
doReturn(BuilderUtils.newResource(512)).when(mockSched)
|
||||||
|
.getMinimumResourceCapability();
|
||||||
|
doReturn(BuilderUtils.newResource(5120)).when(mockSched)
|
||||||
|
.getMaximumResourceCapability();
|
||||||
|
return mockSched;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static RMDelegationTokenSecretManager createRMDelegationTokenSecretManager(
|
||||||
|
long secretKeyInterval, long tokenMaxLifetime, long tokenRenewInterval) {
|
||||||
|
RMDelegationTokenSecretManager rmDtSecretManager = new RMDelegationTokenSecretManager(
|
||||||
|
secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, 3600000);
|
||||||
|
return rmDtSecretManager;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue