MAPREDUCE-4894. Renewal / cancellation of JobHistory tokens (Siddharth Seth via tgraves

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1429086 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2013-01-04 20:15:43 +00:00
parent dd8fc7e499
commit e17cecf550
29 changed files with 1077 additions and 120 deletions

View File

@ -661,6 +661,9 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4832. MR AM can get in a split brain situation (jlowe)
MAPREDUCE-4894. Renewal / cancellation of JobHistory tokens (Siddharth
Seth via tgraves)
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -31,6 +31,8 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@ -55,6 +57,8 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
@ -386,5 +390,19 @@ public class MRClientService extends AbstractService
throw RPCUtil.getRemoteException("MR AM not authorized to issue delegation" +
" token");
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnRemoteException {
throw RPCUtil.getRemoteException("MR AM not authorized to renew delegation" +
" token");
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException {
throw RPCUtil.getRemoteException("MR AM not authorized to cancel delegation" +
" token");
}
}
}

View File

@ -18,5 +18,6 @@
package org.apache.hadoop.mapreduce.v2.api;
public interface HSClientProtocol extends MRClientProtocol {
}

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.v2.api;
import java.net.InetSocketAddress;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@ -44,6 +46,8 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
public interface MRClientProtocol {
@ -64,4 +68,24 @@ public interface MRClientProtocol {
public KillTaskAttemptResponse killTaskAttempt(KillTaskAttemptRequest request) throws YarnRemoteException;
public FailTaskAttemptResponse failTaskAttempt(FailTaskAttemptRequest request) throws YarnRemoteException;
public GetDelegationTokenResponse getDelegationToken(GetDelegationTokenRequest request) throws YarnRemoteException;
/**
* Renew an existing delegation token.
*
* @param request the delegation token to be renewed.
* @return the new expiry time for the delegation token.
* @throws YarnRemoteException
*/
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnRemoteException;
/**
* Cancel an existing delegation token.
*
* @param request the delegation token to be cancelled.
* @return an empty response.
* @throws YarnRemoteException
*/
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException;
}

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.mapreduce.v2.api;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@ -30,6 +31,8 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
* issued by JobHistoryServer to delegate
* MR tasks talking to the JobHistoryServer.
*/
@Private
// TODO Move to a different package.
public class MRDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
public static final Text KIND_NAME = new Text("MR_DELEGATION_TOKEN");

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.mapreduce.v2.api.impl.pb.client;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
@ -27,6 +26,8 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@ -51,6 +52,10 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.FailTaskAttemptRequestPBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.FailTaskAttemptResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetCountersRequestPBImpl;
@ -75,6 +80,9 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.KillTaskAttemp
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.KillTaskAttemptResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.KillTaskRequestPBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.KillTaskResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.FailTaskAttemptRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetCountersRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetDelegationTokenRequestProto;
@ -87,6 +95,7 @@ import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetTaskReportsReques
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillJobRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillTaskAttemptRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillTaskRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.RenewDelegationTokenRequestProto;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
@ -243,4 +252,30 @@ public class MRClientProtocolPBClientImpl implements MRClientProtocol {
}
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnRemoteException {
RenewDelegationTokenRequestProto requestProto =
((RenewDelegationTokenRequestPBImpl) request).getProto();
try {
return new RenewDelegationTokenResponsePBImpl(proxy.renewDelegationToken(
null, requestProto));
} catch (ServiceException e) {
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
}
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException {
CancelDelegationTokenRequestProto requestProto =
((CancelDelegationTokenRequestPBImpl) request).getProto();
try {
return new CancelDelegationTokenResponsePBImpl(
proxy.cancelDelegationToken(null, requestProto));
} catch (ServiceException e) {
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.api.impl.pb.service;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@ -43,6 +44,9 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.FailTaskAttemptRequestPBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.FailTaskAttemptResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetCountersRequestPBImpl;
@ -67,6 +71,10 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.KillTaskAttemp
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.KillTaskAttemptResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.KillTaskRequestPBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.KillTaskResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.CancelDelegationTokenResponseProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.FailTaskAttemptRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.FailTaskAttemptResponseProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetCountersRequestProto;
@ -91,6 +99,8 @@ import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillTaskAttemptReque
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillTaskAttemptResponseProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillTaskRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillTaskResponseProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.RenewDelegationTokenRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.RenewDelegationTokenResponseProto;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import com.google.protobuf.RpcController;
@ -252,4 +262,32 @@ public class MRClientProtocolPBServiceImpl implements MRClientProtocolPB {
}
}
@Override
public RenewDelegationTokenResponseProto renewDelegationToken(
RpcController controller, RenewDelegationTokenRequestProto proto)
throws ServiceException {
RenewDelegationTokenRequestPBImpl request =
new RenewDelegationTokenRequestPBImpl(proto);
try {
RenewDelegationTokenResponse response = real.renewDelegationToken(request);
return ((RenewDelegationTokenResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
throw new ServiceException(e);
}
}
@Override
public CancelDelegationTokenResponseProto cancelDelegationToken(
RpcController controller, CancelDelegationTokenRequestProto proto)
throws ServiceException {
CancelDelegationTokenRequestPBImpl request =
new CancelDelegationTokenRequestPBImpl(proto);
try {
CancelDelegationTokenResponse response = real.cancelDelegationToken(request);
return ((CancelDelegationTokenResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.records.DelegationToken;
/**
* The request issued by the client to the {@code ResourceManager} to cancel a
* delegation token.
*/
@Public
@Evolving
public interface CancelDelegationTokenRequest {
DelegationToken getDelegationToken();
void setDelegationToken(DelegationToken dToken);
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
/**
* The response from the {@code ResourceManager} to a cancelDelegationToken
* request.
*/
@Public
@Evolving
public interface CancelDelegationTokenResponse {
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.records.DelegationToken;
/**
* The request issued by the client to renew a delegation token from
* the {@code ResourceManager}.
*/
@Public
@Evolving
public interface RenewDelegationTokenRequest {
DelegationToken getDelegationToken();
void setDelegationToken(DelegationToken dToken);
}

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
/**
* The response to a renewDelegationToken call to the {@code ResourceManager}.
*/
@Public
@Evolving
public interface RenewDelegationTokenResponse {
long getNextExpirationTime();
void setNextExpirationTime(long expTime);
}

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.CancelDelegationTokenRequestProtoOrBuilder;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.DelegationTokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.DelegationTokenProto;
public class CancelDelegationTokenRequestPBImpl extends
ProtoBase<CancelDelegationTokenRequestProto> implements
CancelDelegationTokenRequest {
CancelDelegationTokenRequestProto proto =
CancelDelegationTokenRequestProto.getDefaultInstance();
CancelDelegationTokenRequestProto.Builder builder = null;
boolean viaProto = false;
public CancelDelegationTokenRequestPBImpl() {
this.builder = CancelDelegationTokenRequestProto.newBuilder();
}
public CancelDelegationTokenRequestPBImpl (
CancelDelegationTokenRequestProto proto) {
this.proto = proto;
this.viaProto = true;
}
DelegationToken token;
@Override
public DelegationToken getDelegationToken() {
CancelDelegationTokenRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.token != null) {
return this.token;
}
if (!p.hasDelegationToken()) {
return null;
}
this.token = convertFromProtoFormat(p.getDelegationToken());
return this.token;
}
@Override
public void setDelegationToken(DelegationToken token) {
maybeInitBuilder();
if (token == null)
builder.clearDelegationToken();
this.token = token;
}
@Override
public CancelDelegationTokenRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (token != null) {
builder.setDelegationToken(convertToProtoFormat(this.token));
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = CancelDelegationTokenRequestProto.newBuilder(proto);
}
viaProto = false;
}
private DelegationTokenPBImpl convertFromProtoFormat(DelegationTokenProto p) {
return new DelegationTokenPBImpl(p);
}
private DelegationTokenProto convertToProtoFormat(DelegationToken t) {
return ((DelegationTokenPBImpl)t).getProto();
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.CancelDelegationTokenResponseProto;
import org.apache.hadoop.yarn.api.records.ProtoBase;
public class CancelDelegationTokenResponsePBImpl extends
ProtoBase<CancelDelegationTokenResponseProto> implements
CancelDelegationTokenResponse {
CancelDelegationTokenResponseProto proto = CancelDelegationTokenResponseProto
.getDefaultInstance();
public CancelDelegationTokenResponsePBImpl() {
}
public CancelDelegationTokenResponsePBImpl(
CancelDelegationTokenResponseProto proto) {
this.proto = proto;
}
@Override
public CancelDelegationTokenResponseProto getProto() {
return proto;
}
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.RenewDelegationTokenRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.RenewDelegationTokenRequestProtoOrBuilder;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.DelegationTokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.DelegationTokenProto;
public class RenewDelegationTokenRequestPBImpl extends
ProtoBase<RenewDelegationTokenRequestProto> implements
RenewDelegationTokenRequest {
RenewDelegationTokenRequestProto proto = RenewDelegationTokenRequestProto
.getDefaultInstance();
RenewDelegationTokenRequestProto.Builder builder = null;
boolean viaProto = false;
public RenewDelegationTokenRequestPBImpl() {
this.builder = RenewDelegationTokenRequestProto.newBuilder();
}
public RenewDelegationTokenRequestPBImpl(
RenewDelegationTokenRequestProto proto) {
this.proto = proto;
this.viaProto = true;
}
DelegationToken token;
@Override
public DelegationToken getDelegationToken() {
RenewDelegationTokenRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.token != null) {
return this.token;
}
if (!p.hasDelegationToken()) {
return null;
}
this.token = convertFromProtoFormat(p.getDelegationToken());
return this.token;
}
@Override
public void setDelegationToken(DelegationToken token) {
maybeInitBuilder();
if (token == null)
builder.clearDelegationToken();
this.token = token;
}
@Override
public RenewDelegationTokenRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (token != null) {
builder.setDelegationToken(convertToProtoFormat(this.token));
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = RenewDelegationTokenRequestProto.newBuilder(proto);
}
viaProto = false;
}
private DelegationTokenPBImpl convertFromProtoFormat(DelegationTokenProto p) {
return new DelegationTokenPBImpl(p);
}
private DelegationTokenProto convertToProtoFormat(DelegationToken t) {
return ((DelegationTokenPBImpl) t).getProto();
}
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.RenewDelegationTokenResponseProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.RenewDelegationTokenResponseProtoOrBuilder;
import org.apache.hadoop.yarn.api.records.ProtoBase;
public class RenewDelegationTokenResponsePBImpl extends
ProtoBase<RenewDelegationTokenResponseProto> implements
RenewDelegationTokenResponse {
RenewDelegationTokenResponseProto proto =
RenewDelegationTokenResponseProto.getDefaultInstance();
RenewDelegationTokenResponseProto.Builder builder = null;
boolean viaProto = false;
public RenewDelegationTokenResponsePBImpl() {
this.builder = RenewDelegationTokenResponseProto.newBuilder();
}
public RenewDelegationTokenResponsePBImpl (
RenewDelegationTokenResponseProto proto) {
this.proto = proto;
this.viaProto = true;
}
@Override
public RenewDelegationTokenResponseProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = RenewDelegationTokenResponseProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public long getNextExpirationTime() {
RenewDelegationTokenResponseProtoOrBuilder p = viaProto ? proto : builder;
return p.getNextExpiryTs();
}
@Override
public void setNextExpirationTime(long expTime) {
maybeInitBuilder();
builder.setNextExpiryTs(expTime);
}
}

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.mapreduce.v2.security;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
@InterfaceAudience.Private
public class MRDelegationTokenRenewer extends TokenRenewer {
private static final Log LOG = LogFactory
.getLog(MRDelegationTokenRenewer.class);
@Override
public boolean handleKind(Text kind) {
return MRDelegationTokenIdentifier.KIND_NAME.equals(kind);
}
@Override
public long renew(Token<?> token, Configuration conf) throws IOException,
InterruptedException {
DelegationToken dToken = BuilderUtils.newDelegationToken(
token.getIdentifier(), token.getKind().toString(), token.getPassword(),
token.getService().toString());
MRClientProtocol histProxy = instantiateHistoryProxy(conf,
SecurityUtil.getTokenServiceAddr(token));
try {
RenewDelegationTokenRequest request = Records
.newRecord(RenewDelegationTokenRequest.class);
request.setDelegationToken(dToken);
return histProxy.renewDelegationToken(request).getNextExpirationTime();
} finally {
stopHistoryProxy(histProxy);
}
}
@Override
public void cancel(Token<?> token, Configuration conf) throws IOException,
InterruptedException {
DelegationToken dToken = BuilderUtils.newDelegationToken(
token.getIdentifier(), token.getKind().toString(), token.getPassword(),
token.getService().toString());
MRClientProtocol histProxy = instantiateHistoryProxy(conf,
SecurityUtil.getTokenServiceAddr(token));
try {
CancelDelegationTokenRequest request = Records
.newRecord(CancelDelegationTokenRequest.class);
request.setDelegationToken(dToken);
histProxy.cancelDelegationToken(request);
} finally {
stopHistoryProxy(histProxy);
}
}
@Override
public boolean isManaged(Token<?> token) throws IOException {
return true;
}
protected void stopHistoryProxy(MRClientProtocol proxy) {
RPC.stopProxy(proxy);
}
protected MRClientProtocol instantiateHistoryProxy(final Configuration conf,
final InetSocketAddress hsAddress) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to MRHistoryServer at: " + hsAddress);
}
final YarnRPC rpc = YarnRPC.create(conf);
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
hsAddress, conf);
}
});
}
}

View File

@ -36,4 +36,6 @@ service MRClientProtocolService {
rpc killTask (KillTaskRequestProto) returns (KillTaskResponseProto);
rpc killTaskAttempt (KillTaskAttemptRequestProto) returns (KillTaskAttemptResponseProto);
rpc failTaskAttempt (FailTaskAttemptRequestProto) returns (FailTaskAttemptResponseProto);
rpc renewDelegationToken(RenewDelegationTokenRequestProto) returns (RenewDelegationTokenResponseProto);
rpc cancelDelegationToken(CancelDelegationTokenRequestProto) returns (CancelDelegationTokenResponseProto);
}

View File

@ -107,3 +107,18 @@ message FailTaskAttemptRequestProto {
}
message FailTaskAttemptResponseProto {
}
message RenewDelegationTokenRequestProto {
required DelegationTokenProto delegation_token = 1;
}
message RenewDelegationTokenResponseProto {
required int64 next_expiry_ts = 1;
}
message CancelDelegationTokenRequestProto {
required DelegationTokenProto delegation_token = 1;
}
message CancelDelegationTokenResponseProto {
}

View File

@ -26,6 +26,8 @@ import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@ -50,6 +52,8 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@ -199,5 +203,17 @@ public class TestRPCFactories {
GetDelegationTokenRequest request) throws YarnRemoteException {
return null;
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnRemoteException {
return null;
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException {
return null;
}
}
}

View File

@ -496,36 +496,6 @@ public class JobClient extends CLI {
clientUgi = UserGroupInformation.getCurrentUser();
}
@InterfaceAudience.Private
public static class Renewer extends TokenRenewer {
@Override
public boolean handleKind(Text kind) {
return DelegationTokenIdentifier.MAPREDUCE_DELEGATION_KIND.equals(kind);
}
@SuppressWarnings("unchecked")
@Override
public long renew(Token<?> token, Configuration conf
) throws IOException, InterruptedException {
return new Cluster(conf).
renewDelegationToken((Token<DelegationTokenIdentifier>) token);
}
@SuppressWarnings("unchecked")
@Override
public void cancel(Token<?> token, Configuration conf
) throws IOException, InterruptedException {
new Cluster(conf).
cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
}
@Override
public boolean isManaged(Token<?> token) throws IOException {
return true;
}
}
/**
* Build a job client, connect to the indicated job tracker.
*

View File

@ -33,14 +33,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
@ -401,12 +399,7 @@ public class Cluster {
public long renewDelegationToken(Token<DelegationTokenIdentifier> token
) throws InvalidToken, IOException,
InterruptedException {
try {
return client.renewDelegationToken(token);
} catch (RemoteException re) {
throw re.unwrapRemoteException(InvalidToken.class,
AccessControlException.class);
}
return token.renew(getConf());
}
/**
@ -418,12 +411,7 @@ public class Cluster {
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
) throws IOException,
InterruptedException {
try {
client.cancelDelegationToken(token);
} catch (RemoteException re) {
throw re.unwrapRemoteException(InvalidToken.class,
AccessControlException.class);
}
token.cancel(getConf());
}
}

View File

@ -1,2 +1 @@
org.apache.hadoop.mapred.JobClient$Renewer
org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier$Renewer

View File

@ -24,6 +24,7 @@ import java.security.AccessControlException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -37,6 +38,8 @@ import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@ -61,6 +64,8 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
@ -75,7 +80,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -83,6 +87,7 @@ import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
@ -314,10 +319,7 @@ public class HistoryClientService extends AbstractService {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
// Verify that the connection is kerberos authenticated
AuthenticationMethod authMethod = UserGroupInformation
.getRealAuthenticationMethod(ugi);
if (UserGroupInformation.isSecurityEnabled()
&& (authMethod != AuthenticationMethod.KERBEROS)) {
if (!isAllowedDelegationTokenOp()) {
throw new IOException(
"Delegation Token can be issued only with kerberos authentication");
}
@ -347,6 +349,55 @@ public class HistoryClientService extends AbstractService {
}
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnRemoteException {
try {
if (!isAllowedDelegationTokenOp()) {
throw new IOException(
"Delegation Token can be renewed only with kerberos authentication");
}
DelegationToken protoToken = request.getDelegationToken();
Token<MRDelegationTokenIdentifier> token = new Token<MRDelegationTokenIdentifier>(
protoToken.getIdentifier().array(), protoToken.getPassword()
.array(), new Text(protoToken.getKind()), new Text(
protoToken.getService()));
String user = UserGroupInformation.getCurrentUser().getShortUserName();
long nextExpTime = jhsDTSecretManager.renewToken(token, user);
RenewDelegationTokenResponse renewResponse = Records
.newRecord(RenewDelegationTokenResponse.class);
renewResponse.setNextExpirationTime(nextExpTime);
return renewResponse;
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException {
try {
if (!isAllowedDelegationTokenOp()) {
throw new IOException(
"Delegation Token can be cancelled only with kerberos authentication");
}
DelegationToken protoToken = request.getDelegationToken();
Token<MRDelegationTokenIdentifier> token = new Token<MRDelegationTokenIdentifier>(
protoToken.getIdentifier().array(), protoToken.getPassword()
.array(), new Text(protoToken.getKind()), new Text(
protoToken.getService()));
String user = UserGroupInformation.getCurrentUser().getShortUserName();
jhsDTSecretManager.cancelToken(token, user);
return Records.newRecord(CancelDelegationTokenResponse.class);
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
}
private void checkAccess(Job job, JobACL jobOperation)
throws YarnRemoteException {
@ -362,5 +413,18 @@ public class HistoryClientService extends AbstractService {
+ jobOperation.name() + " on " + job.getID()));
}
}
private boolean isAllowedDelegationTokenOp() throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
return EnumSet.of(AuthenticationMethod.KERBEROS,
AuthenticationMethod.KERBEROS_SSL,
AuthenticationMethod.CERTIFICATE)
.contains(UserGroupInformation.getCurrentUser()
.getRealAuthenticationMethod());
} else {
return true;
}
}
}
}

View File

@ -24,6 +24,8 @@ import java.util.HashMap;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@ -48,6 +50,8 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@ -216,6 +220,20 @@ public class NotRunningJob implements MRClientProtocol {
throw new NotImplementedException();
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnRemoteException {
/* Should not be invoked by anyone. */
throw new NotImplementedException();
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException {
/* Should not be invoked by anyone. */
throw new NotImplementedException();
}
@Override
public InetSocketAddress getConnectAddress() {
/* Should not be invoked by anyone. Normally used to set token service */

View File

@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.QueueAclsInfo;
import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -41,9 +40,9 @@ import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.client.YarnClientImpl;
public class ResourceMgrDelegate extends YarnClientImpl {
private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
@ -63,11 +62,6 @@ public class ResourceMgrDelegate extends YarnClientImpl {
start();
}
public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
throws IOException, InterruptedException {
return;
}
public TaskTrackerInfo[] getActiveTrackers() throws IOException,
InterruptedException {
return TypeConverter.fromYarnNodes(super.getNodeReports());
@ -168,13 +162,6 @@ public class ResourceMgrDelegate extends YarnClientImpl {
return 0;
}
public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)
throws IOException, InterruptedException {
// TODO: Implement renewDelegationToken
LOG.warn("renewDelegationToken - Not implemented");
return 0;
}
public ApplicationId getApplicationId() {
return applicationId;
}

View File

@ -158,7 +158,7 @@ public class YARNRunner implements ClientProtocol {
@Override
public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
throws IOException, InterruptedException {
resMgrDelegate.cancelDelegationToken(arg0);
throw new UnsupportedOperationException("Use Token.renew instead");
}
@Override
@ -466,7 +466,7 @@ public class YARNRunner implements ClientProtocol {
@Override
public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)
throws IOException, InterruptedException {
return resMgrDelegate.renewDelegationToken(arg0);
throw new UnsupportedOperationException("Use Token.renew instead");
}

View File

@ -25,10 +25,10 @@ import java.util.Iterator;
import junit.framework.Assert;
import org.apache.hadoop.ipc.Server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRConfig;
@ -69,6 +69,8 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@ -87,6 +89,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -339,6 +343,18 @@ public class TestClientRedirect {
GetDelegationTokenRequest request) throws YarnRemoteException {
return null;
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnRemoteException {
return null;
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException {
return null;
}
}
class HistoryService extends AMService implements HSClientProtocol {
@ -504,6 +520,20 @@ public class TestClientRedirect {
throws YarnRemoteException {
return null;
}
@Override
public org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse renewDelegationToken(
org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest request)
throws YarnRemoteException {
return null;
}
@Override
public org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse cancelDelegationToken(
org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest request)
throws YarnRemoteException {
return null;
}
}
static Counters getMyCounters() {

View File

@ -18,19 +18,29 @@
package org.apache.hadoop.mapreduce.security;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.hs.JHSDelegationTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@ -49,6 +59,8 @@ import org.junit.Test;
public class TestJHSSecurity {
private static final Log LOG = LogFactory.getLog(TestJHSSecurity.class);
@Test
public void testDelegationToken() throws IOException, InterruptedException {
@ -64,54 +76,207 @@ public class TestJHSSecurity {
"kerberos");
UserGroupInformation.setConfiguration(conf);
final JobHistoryServer jobHistoryServer = new JobHistoryServer() {
final long initialInterval = 10000l;
final long maxLifetime= 20000l;
final long renewInterval = 10000l;
JobHistoryServer jobHistoryServer = null;
MRClientProtocol clientUsingDT = null;
long tokenFetchTime;
try {
jobHistoryServer = new JobHistoryServer() {
protected void doSecureLogin(Configuration conf) throws IOException {
// no keytab based login
};
protected JHSDelegationTokenSecretManager createJHSSecretManager(
Configuration conf) {
return new JHSDelegationTokenSecretManager(initialInterval,
maxLifetime, renewInterval, 3600000);
}
};
// final JobHistoryServer jobHistoryServer = jhServer;
jobHistoryServer.init(conf);
jobHistoryServer.start();
final MRClientProtocol hsService = jobHistoryServer.getClientService()
.getClientHandler();
// Fake the authentication-method
UserGroupInformation loggedInUser = UserGroupInformation.getCurrentUser();
UserGroupInformation loggedInUser = UserGroupInformation
.createRemoteUser("testrenewer@APACHE.ORG");
Assert.assertEquals("testrenewer", loggedInUser.getShortUserName());
// Default realm is APACHE.ORG
loggedInUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
// Get the delegation token directly as it is a little difficult to setup
// the kerberos based rpc.
DelegationToken token =
loggedInUser.doAs(new PrivilegedExceptionAction<DelegationToken>() {
@Override
public DelegationToken run() throws YarnRemoteException {
GetDelegationTokenRequest request =
Records.newRecord(GetDelegationTokenRequest.class);
request.setRenewer("OneRenewerToRuleThemAll");
return jobHistoryServer.getClientService().getClientHandler()
.getDelegationToken(request).getDelegationToken();
}
});
DelegationToken token = getDelegationToken(loggedInUser, hsService,
loggedInUser.getShortUserName());
tokenFetchTime = System.currentTimeMillis();
LOG.info("Got delegation token at: " + tokenFetchTime);
// Now try talking to JHS using the delegation token
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser("TheDarkLord");
ugi.addToken(ProtoUtils.convertFromProtoFormat(
token, jobHistoryServer.getClientService().getBindAddress()));
final YarnRPC rpc = YarnRPC.create(conf);
MRClientProtocol userUsingDT =
ugi.doAs(new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
jobHistoryServer.getClientService().getBindAddress(), conf);
}
});
clientUsingDT = getMRClientProtocol(token, jobHistoryServer
.getClientService().getBindAddress(), "TheDarkLord", conf);
GetJobReportRequest jobReportRequest =
Records.newRecord(GetJobReportRequest.class);
jobReportRequest.setJobId(MRBuilderUtils.newJobId(123456, 1, 1));
try {
userUsingDT.getJobReport(jobReportRequest);
clientUsingDT.getJobReport(jobReportRequest);
} catch (YarnRemoteException e) {
Assert.assertEquals("Unknown job job_123456_0001", e.getMessage());
}
// Renew after 50% of token age.
while(System.currentTimeMillis() < tokenFetchTime + initialInterval / 2) {
Thread.sleep(500l);
}
long nextExpTime = renewDelegationToken(loggedInUser, hsService, token);
long renewalTime = System.currentTimeMillis();
LOG.info("Renewed token at: " + renewalTime + ", NextExpiryTime: "
+ nextExpTime);
// Wait for first expiry, but before renewed expiry.
while (System.currentTimeMillis() > tokenFetchTime + initialInterval
&& System.currentTimeMillis() < nextExpTime) {
Thread.sleep(500l);
}
Thread.sleep(50l);
// Valid token because of renewal.
try {
clientUsingDT.getJobReport(jobReportRequest);
} catch (UndeclaredThrowableException e) {
Assert.assertEquals("Unknown job job_123456_0001", e.getMessage());
}
// Wait for expiry.
while(System.currentTimeMillis() < renewalTime + renewInterval) {
Thread.sleep(500l);
}
Thread.sleep(50l);
LOG.info("At time: " + System.currentTimeMillis() + ", token should be invalid");
// Token should have expired.
try {
clientUsingDT.getJobReport(jobReportRequest);
fail("Should not have succeeded with an expired token");
} catch (UndeclaredThrowableException e) {
assertTrue(e.getCause().getMessage().contains("is expired"));
}
// Test cancellation
// Stop the existing proxy, start another.
if (clientUsingDT != null) {
// RPC.stopProxy(clientUsingDT);
clientUsingDT = null;
}
token = getDelegationToken(loggedInUser, hsService,
loggedInUser.getShortUserName());
tokenFetchTime = System.currentTimeMillis();
LOG.info("Got delegation token at: " + tokenFetchTime);
// Now try talking to HSService using the delegation token
clientUsingDT = getMRClientProtocol(token, jobHistoryServer
.getClientService().getBindAddress(), "loginuser2", conf);
try {
clientUsingDT.getJobReport(jobReportRequest);
} catch (UndeclaredThrowableException e) {
fail("Unexpected exception" + e);
}
cancelDelegationToken(loggedInUser, hsService, token);
if (clientUsingDT != null) {
// RPC.stopProxy(clientUsingDT);
clientUsingDT = null;
}
// Creating a new connection.
clientUsingDT = getMRClientProtocol(token, jobHistoryServer
.getClientService().getBindAddress(), "loginuser2", conf);
LOG.info("Cancelled delegation token at: " + System.currentTimeMillis());
// Verify cancellation worked.
try {
clientUsingDT.getJobReport(jobReportRequest);
fail("Should not have succeeded with a cancelled delegation token");
} catch (UndeclaredThrowableException e) {
}
} finally {
jobHistoryServer.stop();
}
}
private DelegationToken getDelegationToken(
final UserGroupInformation loggedInUser,
final MRClientProtocol hsService, final String renewerString)
throws IOException, InterruptedException {
// Get the delegation token directly as it is a little difficult to setup
// the kerberos based rpc.
DelegationToken token = loggedInUser
.doAs(new PrivilegedExceptionAction<DelegationToken>() {
@Override
public DelegationToken run() throws YarnRemoteException {
GetDelegationTokenRequest request = Records
.newRecord(GetDelegationTokenRequest.class);
request.setRenewer(renewerString);
return hsService.getDelegationToken(request).getDelegationToken();
}
});
return token;
}
private long renewDelegationToken(final UserGroupInformation loggedInUser,
final MRClientProtocol hsService, final DelegationToken dToken)
throws IOException, InterruptedException {
long nextExpTime = loggedInUser.doAs(new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws YarnRemoteException {
RenewDelegationTokenRequest request = Records
.newRecord(RenewDelegationTokenRequest.class);
request.setDelegationToken(dToken);
return hsService.renewDelegationToken(request).getNextExpirationTime();
}
});
return nextExpTime;
}
private void cancelDelegationToken(final UserGroupInformation loggedInUser,
final MRClientProtocol hsService, final DelegationToken dToken)
throws IOException, InterruptedException {
loggedInUser.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws YarnRemoteException {
CancelDelegationTokenRequest request = Records
.newRecord(CancelDelegationTokenRequest.class);
request.setDelegationToken(dToken);
hsService.cancelDelegationToken(request);
return null;
}
});
}
private MRClientProtocol getMRClientProtocol(DelegationToken token,
final InetSocketAddress hsAddress, String user, final Configuration conf) {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
ugi.addToken(ProtoUtils.convertFromProtoFormat(token, hsAddress));
final YarnRPC rpc = YarnRPC.create(conf);
MRClientProtocol hsWithDT = ugi
.doAs(new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
hsAddress, conf);
}
});
return hsWithDT;
}
}