Merge MAPREDUCE-3955 from trunk. Change MR to use ProtobufRpcEngine from hadoop-common instead of ProtoOverHadoopRpcEngine. (Contributed by Jitendra Nath Pandey)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1306690 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2012-03-29 02:04:21 +00:00
parent ed41b8dfe3
commit 55639bea52
48 changed files with 479 additions and 816 deletions

View File

@ -40,6 +40,9 @@ Release 2.0.0 - UNRELEASED
MAPREDUCE-3353. Add a channel between RM and AM to get information on MAPREDUCE-3353. Add a channel between RM and AM to get information on
nodes. (Bikas Saha via acmurthy) nodes. (Bikas Saha via acmurthy)
MAPREDUCE-3955. Change MR to use ProtobufRpcEngine from hadoop-common
instead of ProtoOverHadoopRpcEngine. (Jitendra Nath Pandey via sseth)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -21,12 +21,12 @@
import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB;
import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.SecurityInfo; import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenInfo; import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.yarn.proto.MRClientProtocol;
import org.apache.hadoop.yarn.security.client.ClientTokenSelector; import org.apache.hadoop.yarn.security.client.ClientTokenSelector;
public class MRClientSecurityInfo extends SecurityInfo { public class MRClientSecurityInfo extends SecurityInfo {
@ -38,7 +38,7 @@ public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
@Override @Override
public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) { public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
if (!protocol.equals(MRClientProtocol.MRClientProtocolService.BlockingInterface.class)) { if (!protocol.equals(MRClientProtocolPB.class)) {
return null; return null;
} }
return new TokenInfo() { return new TokenInfo() {

View File

@ -19,10 +19,10 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocolPB;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.yarn.proto.HSClientProtocol;
/** /**
* {@link PolicyProvider} for YARN MapReduce protocols. * {@link PolicyProvider} for YARN MapReduce protocols.
@ -35,7 +35,7 @@ public class ClientHSPolicyProvider extends PolicyProvider {
new Service[] { new Service[] {
new Service( new Service(
JHAdminConfig.MR_HS_SECURITY_SERVICE_AUTHORIZATION, JHAdminConfig.MR_HS_SECURITY_SERVICE_AUTHORIZATION,
HSClientProtocol.HSClientProtocolService.BlockingInterface.class) HSClientProtocolPB.class)
}; };
@Override @Override

View File

@ -21,9 +21,9 @@
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB;
import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.yarn.proto.MRClientProtocol;
/** /**
* {@link PolicyProvider} for YARN MapReduce protocols. * {@link PolicyProvider} for YARN MapReduce protocols.
@ -39,7 +39,7 @@ public class MRAMPolicyProvider extends PolicyProvider {
TaskUmbilicalProtocol.class), TaskUmbilicalProtocol.class),
new Service( new Service(
MRJobConfig.MR_AM_SECURITY_SERVICE_AUTHORIZATION_CLIENT, MRJobConfig.MR_AM_SECURITY_SERVICE_AUTHORIZATION_CLIENT,
MRClientProtocol.MRClientProtocolService.BlockingInterface.class) MRClientProtocolPB.class)
}; };
@Override @Override

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.yarn.proto.HSClientProtocol.HSClientProtocolService;
@ProtocolInfo(protocolName = "org.apache.hadoop.mapreduce.v2.api.HSClientProtocolPB",
protocolVersion = 1)
public interface HSClientProtocolPB extends HSClientProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.api;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.yarn.proto.MRClientProtocol.MRClientProtocolService;
@ProtocolInfo(
protocolName = "org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB",
protocolVersion = 1)
public interface MRClientProtocolPB extends MRClientProtocolService.BlockingInterface {
}

View File

@ -22,10 +22,10 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol; import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine; import org.apache.hadoop.mapreduce.v2.api.HSClientProtocolPB;
import org.apache.hadoop.yarn.proto.HSClientProtocol.HSClientProtocolService;
public class HSClientProtocolPBClientImpl extends MRClientProtocolPBClientImpl public class HSClientProtocolPBClientImpl extends MRClientProtocolPBClientImpl
implements HSClientProtocol { implements HSClientProtocol {
@ -33,9 +33,9 @@ public class HSClientProtocolPBClientImpl extends MRClientProtocolPBClientImpl
public HSClientProtocolPBClientImpl(long clientVersion, public HSClientProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException { InetSocketAddress addr, Configuration conf) throws IOException {
super(); super();
RPC.setProtocolEngine(conf, HSClientProtocolService.BlockingInterface.class, RPC.setProtocolEngine(conf, HSClientProtocolPB.class,
ProtoOverHadoopRpcEngine.class); ProtobufRpcEngine.class);
proxy = (HSClientProtocolService.BlockingInterface)RPC.getProxy( proxy = (HSClientProtocolPB)RPC.getProxy(
HSClientProtocolService.BlockingInterface.class, clientVersion, addr, conf); HSClientProtocolPB.class, clientVersion, addr, conf);
} }
} }

View File

@ -23,8 +23,10 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@ -86,21 +88,20 @@
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillTaskAttemptRequestProto; import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillTaskAttemptRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillTaskRequestProto; import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillTaskRequestProto;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine; import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.proto.MRClientProtocol.MRClientProtocolService;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
public class MRClientProtocolPBClientImpl implements MRClientProtocol { public class MRClientProtocolPBClientImpl implements MRClientProtocol {
protected MRClientProtocolService.BlockingInterface proxy; protected MRClientProtocolPB proxy;
public MRClientProtocolPBClientImpl() {}; public MRClientProtocolPBClientImpl() {};
public MRClientProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException { public MRClientProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, MRClientProtocolService.BlockingInterface.class, ProtoOverHadoopRpcEngine.class); RPC.setProtocolEngine(conf, MRClientProtocolPB.class, ProtobufRpcEngine.class);
proxy = (MRClientProtocolService.BlockingInterface)RPC.getProxy( proxy = (MRClientProtocolPB)RPC.getProxy(
MRClientProtocolService.BlockingInterface.class, clientVersion, addr, conf); MRClientProtocolPB.class, clientVersion, addr, conf);
} }
@Override @Override
@ -110,13 +111,7 @@ public GetJobReportResponse getJobReport(GetJobReportRequest request)
try { try {
return new GetJobReportResponsePBImpl(proxy.getJobReport(null, requestProto)); return new GetJobReportResponsePBImpl(proxy.getJobReport(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -127,13 +122,7 @@ public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
try { try {
return new GetTaskReportResponsePBImpl(proxy.getTaskReport(null, requestProto)); return new GetTaskReportResponsePBImpl(proxy.getTaskReport(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -144,13 +133,7 @@ public GetTaskAttemptReportResponse getTaskAttemptReport(
try { try {
return new GetTaskAttemptReportResponsePBImpl(proxy.getTaskAttemptReport(null, requestProto)); return new GetTaskAttemptReportResponsePBImpl(proxy.getTaskAttemptReport(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -161,13 +144,7 @@ public GetCountersResponse getCounters(GetCountersRequest request)
try { try {
return new GetCountersResponsePBImpl(proxy.getCounters(null, requestProto)); return new GetCountersResponsePBImpl(proxy.getCounters(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -178,13 +155,7 @@ public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
try { try {
return new GetTaskAttemptCompletionEventsResponsePBImpl(proxy.getTaskAttemptCompletionEvents(null, requestProto)); return new GetTaskAttemptCompletionEventsResponsePBImpl(proxy.getTaskAttemptCompletionEvents(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -195,13 +166,7 @@ public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
try { try {
return new GetTaskReportsResponsePBImpl(proxy.getTaskReports(null, requestProto)); return new GetTaskReportsResponsePBImpl(proxy.getTaskReports(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -212,13 +177,7 @@ public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
try { try {
return new GetDiagnosticsResponsePBImpl(proxy.getDiagnostics(null, requestProto)); return new GetDiagnosticsResponsePBImpl(proxy.getDiagnostics(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -231,13 +190,7 @@ public GetDelegationTokenResponse getDelegationToken(
return new GetDelegationTokenResponsePBImpl(proxy.getDelegationToken( return new GetDelegationTokenResponsePBImpl(proxy.getDelegationToken(
null, requestProto)); null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -248,13 +201,7 @@ public KillJobResponse killJob(KillJobRequest request)
try { try {
return new KillJobResponsePBImpl(proxy.killJob(null, requestProto)); return new KillJobResponsePBImpl(proxy.killJob(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -265,13 +212,7 @@ public KillTaskResponse killTask(KillTaskRequest request)
try { try {
return new KillTaskResponsePBImpl(proxy.killTask(null, requestProto)); return new KillTaskResponsePBImpl(proxy.killTask(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -282,13 +223,7 @@ public KillTaskAttemptResponse killTaskAttempt(KillTaskAttemptRequest request)
try { try {
return new KillTaskAttemptResponsePBImpl(proxy.killTaskAttempt(null, requestProto)); return new KillTaskAttemptResponsePBImpl(proxy.killTaskAttempt(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -299,13 +234,7 @@ public FailTaskAttemptResponse failTaskAttempt(FailTaskAttemptRequest request)
try { try {
return new FailTaskAttemptResponsePBImpl(proxy.failTaskAttempt(null, requestProto)); return new FailTaskAttemptResponsePBImpl(proxy.failTaskAttempt(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }

View File

@ -19,10 +19,10 @@
package org.apache.hadoop.mapreduce.v2.api.impl.pb.service; package org.apache.hadoop.mapreduce.v2.api.impl.pb.service;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol; import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.yarn.proto.HSClientProtocol.HSClientProtocolService.BlockingInterface; import org.apache.hadoop.mapreduce.v2.api.HSClientProtocolPB;
public class HSClientProtocolPBServiceImpl extends MRClientProtocolPBServiceImpl public class HSClientProtocolPBServiceImpl extends MRClientProtocolPBServiceImpl
implements BlockingInterface { implements HSClientProtocolPB {
public HSClientProtocolPBServiceImpl(HSClientProtocol impl) { public HSClientProtocolPBServiceImpl(HSClientProtocol impl) {
super(impl); super(impl);
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.api.impl.pb.service; package org.apache.hadoop.mapreduce.v2.api.impl.pb.service;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@ -91,12 +92,11 @@
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillTaskRequestProto; import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillTaskRequestProto;
import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillTaskResponseProto; import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillTaskResponseProto;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.proto.MRClientProtocol.MRClientProtocolService.BlockingInterface;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
public class MRClientProtocolPBServiceImpl implements BlockingInterface { public class MRClientProtocolPBServiceImpl implements MRClientProtocolPB {
private MRClientProtocol real; private MRClientProtocol real;

View File

@ -21,20 +21,20 @@
import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocolPB;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.SecurityInfo; import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenInfo; import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.yarn.proto.HSClientProtocol;
public class ClientHSSecurityInfo extends SecurityInfo { public class ClientHSSecurityInfo extends SecurityInfo {
@Override @Override
public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) { public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
if (!protocol if (!protocol
.equals(HSClientProtocol.HSClientProtocolService.BlockingInterface.class)) { .equals(HSClientProtocolPB.class)) {
return null; return null;
} }
return new KerberosInfo() { return new KerberosInfo() {
@ -59,7 +59,7 @@ public String clientPrincipal() {
@Override @Override
public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) { public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
if (!protocol if (!protocol
.equals(HSClientProtocol.HSClientProtocolService.BlockingInterface.class)) { .equals(HSClientProtocolPB.class)) {
return null; return null;
} }
return new TokenInfo() { return new TokenInfo() {

View File

@ -18,14 +18,23 @@
package org.apache.hadoop.yarn.proto; package org.apache.hadoop.yarn.proto;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB;
import org.apache.hadoop.yarn.proto.MRClientProtocol.MRClientProtocolService;
/** /**
* Fake protocol to differentiate the blocking interfaces in the * Fake protocol to differentiate the blocking interfaces in the
* security info class loaders. * security info class loaders.
*/ */
public interface HSClientProtocol { public interface HSClientProtocol {
public abstract class HSClientProtocolService { public abstract class HSClientProtocolService {
public interface BlockingInterface extends public interface BlockingInterface extends MRClientProtocolPB {
MRClientProtocol.MRClientProtocolService.BlockingInterface { }
public static com.google.protobuf.BlockingService newReflectiveBlockingService(
final HSClientProtocolService.BlockingInterface impl) {
// The cast is safe
return MRClientProtocolService
.newReflectiveBlockingService((MRClientProtocolService.BlockingInterface) impl);
} }
} }
} }

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
@ -96,7 +97,7 @@ public class HistoryClientService extends AbstractService {
private static final Log LOG = LogFactory.getLog(HistoryClientService.class); private static final Log LOG = LogFactory.getLog(HistoryClientService.class);
private MRClientProtocol protocolHandler; private HSClientProtocol protocolHandler;
private Server server; private Server server;
private WebApp webApp; private WebApp webApp;
private InetSocketAddress bindAddress; private InetSocketAddress bindAddress;
@ -107,7 +108,7 @@ public HistoryClientService(HistoryContext history,
JHSDelegationTokenSecretManager jhsDTSecretManager) { JHSDelegationTokenSecretManager jhsDTSecretManager) {
super("HistoryClientService"); super("HistoryClientService");
this.history = history; this.history = history;
this.protocolHandler = new MRClientProtocolHandler(); this.protocolHandler = new HSClientProtocolHandler();
this.jhsDTSecretManager = jhsDTSecretManager; this.jhsDTSecretManager = jhsDTSecretManager;
} }
@ -128,7 +129,7 @@ public void start() {
} }
server = server =
rpc.getServer(MRClientProtocol.class, protocolHandler, address, rpc.getServer(HSClientProtocol.class, protocolHandler, address,
conf, jhsDTSecretManager, conf, jhsDTSecretManager,
conf.getInt(JHAdminConfig.MR_HISTORY_CLIENT_THREAD_COUNT, conf.getInt(JHAdminConfig.MR_HISTORY_CLIENT_THREAD_COUNT,
JHAdminConfig.DEFAULT_MR_HISTORY_CLIENT_THREAD_COUNT)); JHAdminConfig.DEFAULT_MR_HISTORY_CLIENT_THREAD_COUNT));
@ -177,7 +178,7 @@ public InetSocketAddress getBindAddress() {
return this.bindAddress; return this.bindAddress;
} }
private class MRClientProtocolHandler implements MRClientProtocol { private class HSClientProtocolHandler implements HSClientProtocol {
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
@ -340,9 +341,10 @@ public GetDelegationTokenResponse getDelegationToken(
} }
} }
class HistoryService extends AMService { class HistoryService extends AMService implements HSClientProtocol {
public HistoryService() { public HistoryService() {
super(HSHOSTADDRESS); super(HSHOSTADDRESS);
this.protocol = HSClientProtocol.class;
} }
@Override @Override
@ -357,6 +359,7 @@ public GetCountersResponse getCounters(GetCountersRequest request) throws YarnRe
class AMService extends AbstractService class AMService extends AbstractService
implements MRClientProtocol { implements MRClientProtocol {
protected Class<?> protocol;
private InetSocketAddress bindAddress; private InetSocketAddress bindAddress;
private Server server; private Server server;
private final String hostAddress; private final String hostAddress;
@ -367,6 +370,7 @@ public AMService() {
public AMService(String hostAddress) { public AMService(String hostAddress) {
super("AMService"); super("AMService");
this.protocol = MRClientProtocol.class;
this.hostAddress = hostAddress; this.hostAddress = hostAddress;
} }
@ -383,7 +387,7 @@ public void start(Configuration conf) {
} }
server = server =
rpc.getServer(MRClientProtocol.class, this, address, rpc.getServer(protocol, this, address,
conf, null, 1); conf, null, 1);
server.start(); server.start();
this.bindAddress = this.bindAddress =

View File

@ -35,7 +35,7 @@
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -142,7 +142,7 @@ public void testJobHistoryData() throws IOException, InterruptedException,
LOG.info("CounterMR " + counterMR); LOG.info("CounterMR " + counterMR);
Assert.assertEquals(counterHS, counterMR); Assert.assertEquals(counterHS, counterMR);
MRClientProtocol historyClient = instantiateHistoryProxy(); HSClientProtocol historyClient = instantiateHistoryProxy();
GetJobReportRequest gjReq = Records.newRecord(GetJobReportRequest.class); GetJobReportRequest gjReq = Records.newRecord(GetJobReportRequest.class);
gjReq.setJobId(jobId); gjReq.setJobId(jobId);
JobReport jobReport = historyClient.getJobReport(gjReq).getJobReport(); JobReport jobReport = historyClient.getJobReport(gjReq).getJobReport();
@ -164,12 +164,12 @@ private void verifyJobReport(JobReport jobReport, JobId jobId) {
&& jobReport.getFinishTime() >= jobReport.getStartTime()); && jobReport.getFinishTime() >= jobReport.getStartTime());
} }
private MRClientProtocol instantiateHistoryProxy() { private HSClientProtocol instantiateHistoryProxy() {
final String serviceAddr = final String serviceAddr =
mrCluster.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS); mrCluster.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS);
final YarnRPC rpc = YarnRPC.create(conf); final YarnRPC rpc = YarnRPC.create(conf);
MRClientProtocol historyClient = HSClientProtocol historyClient =
(MRClientProtocol) rpc.getProxy(MRClientProtocol.class, (HSClientProtocol) rpc.getProxy(HSClientProtocol.class,
NetUtils.createSocketAddr(serviceAddr), mrCluster.getConfig()); NetUtils.createSocketAddr(serviceAddr), mrCluster.getConfig());
return historyClient; return historyClient;
} }

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.yarn.proto.AMRMProtocol.AMRMProtocolService;
@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.AMRMProtocolPB",
protocolVersion = 1)
public interface AMRMProtocolPB extends AMRMProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.yarn.proto.ClientRMProtocol.ClientRMProtocolService;
@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.ClientRMProtocolPB",
protocolVersion = 1)
public interface ClientRMProtocolPB extends ClientRMProtocolService.BlockingInterface {
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.yarn.proto.ContainerManager.ContainerManagerService;
@ProtocolInfo(
protocolName = "org.apache.hadoop.yarn.api.ContainerManagerPB",
protocolVersion = 1)
public interface ContainerManagerPB extends ContainerManagerService.BlockingInterface {
}

View File

@ -18,12 +18,16 @@
package org.apache.hadoop.yarn.exceptions.impl.pb; package org.apache.hadoop.yarn.exceptions.impl.pb;
import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.lang.reflect.UndeclaredThrowableException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnRemoteExceptionProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnRemoteExceptionProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnRemoteExceptionProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.YarnRemoteExceptionProtoOrBuilder;
import com.google.protobuf.ServiceException;
public class YarnRemoteExceptionPBImpl extends YarnRemoteException { public class YarnRemoteExceptionPBImpl extends YarnRemoteException {
@ -105,4 +109,30 @@ private void maybeInitBuilder() {
} }
viaProto = false; viaProto = false;
} }
/**
* Utility method that unwraps and throws appropriate exception.
* @param se ServiceException
* @throws YarnRemoteException
* @throws UndeclaredThrowableException
*/
public static YarnRemoteException unwrapAndThrowException(ServiceException se)
throws UndeclaredThrowableException {
if (se.getCause() instanceof RemoteException) {
try {
throw ((RemoteException) se.getCause())
.unwrapRemoteException(YarnRemoteExceptionPBImpl.class);
} catch (YarnRemoteException ex) {
return ex;
} catch (IOException e1) {
throw new UndeclaredThrowableException(e1);
}
} else if (se.getCause() instanceof YarnRemoteException) {
return (YarnRemoteException)se.getCause();
} else if (se.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)se.getCause();
} else {
throw new UndeclaredThrowableException(se);
}
}
} }

View File

@ -19,12 +19,13 @@
package org.apache.hadoop.yarn.api.impl.pb.client; package org.apache.hadoop.yarn.api.impl.pb.client;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.AMRMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@ -38,8 +39,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine; import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.proto.AMRMProtocol.AMRMProtocolService;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
@ -48,12 +48,12 @@
public class AMRMProtocolPBClientImpl implements AMRMProtocol { public class AMRMProtocolPBClientImpl implements AMRMProtocol {
private AMRMProtocolService.BlockingInterface proxy; private AMRMProtocolPB proxy;
public AMRMProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException { public AMRMProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, AMRMProtocolService.BlockingInterface.class, ProtoOverHadoopRpcEngine.class); RPC.setProtocolEngine(conf, AMRMProtocolPB.class, ProtobufRpcEngine.class);
proxy = (AMRMProtocolService.BlockingInterface)RPC.getProxy( proxy = (AMRMProtocolPB)RPC.getProxy(
AMRMProtocolService.BlockingInterface.class, clientVersion, addr, conf); AMRMProtocolPB.class, clientVersion, addr, conf);
} }
@ -64,13 +64,7 @@ public AllocateResponse allocate(AllocateRequest request)
try { try {
return new AllocateResponsePBImpl(proxy.allocate(null, requestProto)); return new AllocateResponsePBImpl(proxy.allocate(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -83,13 +77,7 @@ public FinishApplicationMasterResponse finishApplicationMaster(
try { try {
return new FinishApplicationMasterResponsePBImpl(proxy.finishApplicationMaster(null, requestProto)); return new FinishApplicationMasterResponsePBImpl(proxy.finishApplicationMaster(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -100,13 +88,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
try { try {
return new RegisterApplicationMasterResponsePBImpl(proxy.registerApplicationMaster(null, requestProto)); return new RegisterApplicationMasterResponsePBImpl(proxy.registerApplicationMaster(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
} }

View File

@ -19,12 +19,13 @@
package org.apache.hadoop.yarn.api.impl.pb.client; package org.apache.hadoop.yarn.api.impl.pb.client;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.ClientRMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@ -66,8 +67,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine; import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.proto.ClientRMProtocol.ClientRMProtocolService;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsRequestProto;
@ -83,12 +83,12 @@
public class ClientRMProtocolPBClientImpl implements ClientRMProtocol { public class ClientRMProtocolPBClientImpl implements ClientRMProtocol {
private ClientRMProtocolService.BlockingInterface proxy; private ClientRMProtocolPB proxy;
public ClientRMProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException { public ClientRMProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ClientRMProtocolService.BlockingInterface.class, ProtoOverHadoopRpcEngine.class); RPC.setProtocolEngine(conf, ClientRMProtocolPB.class, ProtobufRpcEngine.class);
proxy = (ClientRMProtocolService.BlockingInterface)RPC.getProxy( proxy = (ClientRMProtocolPB)RPC.getProxy(
ClientRMProtocolService.BlockingInterface.class, clientVersion, addr, conf); ClientRMProtocolPB.class, clientVersion, addr, conf);
} }
@Override @Override
@ -98,13 +98,7 @@ public KillApplicationResponse forceKillApplication(
try { try {
return new KillApplicationResponsePBImpl(proxy.forceKillApplication(null, requestProto)); return new KillApplicationResponsePBImpl(proxy.forceKillApplication(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -115,13 +109,7 @@ public GetApplicationReportResponse getApplicationReport(
try { try {
return new GetApplicationReportResponsePBImpl(proxy.getApplicationReport(null, requestProto)); return new GetApplicationReportResponsePBImpl(proxy.getApplicationReport(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -132,13 +120,7 @@ public GetClusterMetricsResponse getClusterMetrics(
try { try {
return new GetClusterMetricsResponsePBImpl(proxy.getClusterMetrics(null, requestProto)); return new GetClusterMetricsResponsePBImpl(proxy.getClusterMetrics(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -149,13 +131,7 @@ public GetNewApplicationResponse getNewApplication(
try { try {
return new GetNewApplicationResponsePBImpl(proxy.getNewApplication(null, requestProto)); return new GetNewApplicationResponsePBImpl(proxy.getNewApplication(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -166,13 +142,7 @@ public SubmitApplicationResponse submitApplication(
try { try {
return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null, requestProto)); return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -185,13 +155,7 @@ public GetAllApplicationsResponse getAllApplications(
return new GetAllApplicationsResponsePBImpl( return new GetAllApplicationsResponsePBImpl(
proxy.getAllApplications(null, requestProto)); proxy.getAllApplications(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -204,13 +168,7 @@ public GetClusterNodesResponse getClusterNodes(
return new GetClusterNodesResponsePBImpl( return new GetClusterNodesResponsePBImpl(
proxy.getClusterNodes(null, requestProto)); proxy.getClusterNodes(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -223,13 +181,7 @@ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
return new GetQueueInfoResponsePBImpl( return new GetQueueInfoResponsePBImpl(
proxy.getQueueInfo(null, requestProto)); proxy.getQueueInfo(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -242,13 +194,7 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls(
return new GetQueueUserAclsInfoResponsePBImpl( return new GetQueueUserAclsInfoResponsePBImpl(
proxy.getQueueUserAcls(null, requestProto)); proxy.getQueueUserAcls(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -261,13 +207,7 @@ public GetDelegationTokenResponse getDelegationToken(
return new GetDelegationTokenResponsePBImpl( return new GetDelegationTokenResponsePBImpl(
proxy.getDelegationToken(null, requestProto)); proxy.getDelegationToken(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
} }

View File

@ -19,12 +19,13 @@
package org.apache.hadoop.yarn.api.impl.pb.client; package org.apache.hadoop.yarn.api.impl.pb.client;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.ContainerManagerPB;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@ -38,8 +39,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainerRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainerResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine; import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.proto.ContainerManager.ContainerManagerService;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainerRequestProto;
@ -48,12 +48,12 @@
public class ContainerManagerPBClientImpl implements ContainerManager { public class ContainerManagerPBClientImpl implements ContainerManager {
private ContainerManagerService.BlockingInterface proxy; private ContainerManagerPB proxy;
public ContainerManagerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException { public ContainerManagerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ContainerManagerService.BlockingInterface.class, ProtoOverHadoopRpcEngine.class); RPC.setProtocolEngine(conf, ContainerManagerPB.class, ProtobufRpcEngine.class);
proxy = (ContainerManagerService.BlockingInterface)RPC.getProxy( proxy = (ContainerManagerPB)RPC.getProxy(
ContainerManagerService.BlockingInterface.class, clientVersion, addr, conf); ContainerManagerPB.class, clientVersion, addr, conf);
} }
public void close() { public void close() {
@ -69,13 +69,7 @@ public GetContainerStatusResponse getContainerStatus(
try { try {
return new GetContainerStatusResponsePBImpl(proxy.getContainerStatus(null, requestProto)); return new GetContainerStatusResponsePBImpl(proxy.getContainerStatus(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -86,31 +80,20 @@ public StartContainerResponse startContainer(StartContainerRequest request)
try { try {
return new StartContainerResponsePBImpl(proxy.startContainer(null, requestProto)); return new StartContainerResponsePBImpl(proxy.startContainer(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@Override @Override
public StopContainerResponse stopContainer(StopContainerRequest request) public StopContainerResponse stopContainer(StopContainerRequest request)
throws YarnRemoteException { throws YarnRemoteException {
StopContainerRequestProto requestProto = ((StopContainerRequestPBImpl)request).getProto(); StopContainerRequestProto requestProto = ((StopContainerRequestPBImpl) request)
.getProto();
try { try {
return new StopContainerResponsePBImpl(proxy.stopContainer(null, requestProto)); return new StopContainerResponsePBImpl(proxy.stopContainer(null,
requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
} }
} }
} }
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.api.impl.pb.service; package org.apache.hadoop.yarn.api.impl.pb.service;
import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.AMRMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@ -29,7 +30,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.proto.AMRMProtocol.AMRMProtocolService.BlockingInterface;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterRequestProto;
@ -40,7 +40,7 @@
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
public class AMRMProtocolPBServiceImpl implements BlockingInterface { public class AMRMProtocolPBServiceImpl implements AMRMProtocolPB {
private AMRMProtocol real; private AMRMProtocol real;

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.api.impl.pb.service; package org.apache.hadoop.yarn.api.impl.pb.service;
import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.ClientRMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
@ -50,7 +51,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.proto.ClientRMProtocol.ClientRMProtocolService.BlockingInterface;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto;
@ -75,7 +75,7 @@
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
public class ClientRMProtocolPBServiceImpl implements BlockingInterface { public class ClientRMProtocolPBServiceImpl implements ClientRMProtocolPB {
private ClientRMProtocol real; private ClientRMProtocol real;

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.api.impl.pb.service; package org.apache.hadoop.yarn.api.impl.pb.service;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.ContainerManagerPB;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
@ -29,7 +30,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainerRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainerResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.proto.ContainerManager.ContainerManagerService.BlockingInterface;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
@ -40,7 +40,7 @@
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
public class ContainerManagerPBServiceImpl implements BlockingInterface { public class ContainerManagerPBServiceImpl implements ContainerManagerPB {
private ContainerManager real; private ContainerManager real;

View File

@ -26,19 +26,23 @@
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.factories.RpcServerFactory; import org.apache.hadoop.yarn.factories.RpcServerFactory;
import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
public class RpcServerFactoryPBImpl implements RpcServerFactory { public class RpcServerFactoryPBImpl implements RpcServerFactory {
private static final Log LOG = LogFactory.getLog(RpcServerFactoryPBImpl.class);
private static final String PROTO_GEN_PACKAGE_NAME = "org.apache.hadoop.yarn.proto"; private static final String PROTO_GEN_PACKAGE_NAME = "org.apache.hadoop.yarn.proto";
private static final String PROTO_GEN_CLASS_SUFFIX = "Service"; private static final String PROTO_GEN_CLASS_SUFFIX = "Service";
private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb.service"; private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb.service";
@ -96,6 +100,7 @@ public Server getServer(Class<?> protocol, Object instance,
throw new YarnException(e); throw new YarnException(e);
} }
Class<?> pbProtocol = service.getClass().getInterfaces()[0];
Method method = protoCache.get(protocol); Method method = protoCache.get(protocol);
if (method == null) { if (method == null) {
Class<?> protoClazz = null; Class<?> protoClazz = null;
@ -106,7 +111,8 @@ public Server getServer(Class<?> protocol, Object instance,
+ getProtoClassName(protocol) + "]", e); + getProtoClassName(protocol) + "]", e);
} }
try { try {
method = protoClazz.getMethod("newReflectiveBlockingService", service.getClass().getInterfaces()[0]); method = protoClazz.getMethod("newReflectiveBlockingService",
pbProtocol.getInterfaces()[0]);
method.setAccessible(true); method.setAccessible(true);
protoCache.putIfAbsent(protocol, method); protoCache.putIfAbsent(protocol, method);
} catch (NoSuchMethodException e) { } catch (NoSuchMethodException e) {
@ -115,7 +121,7 @@ public Server getServer(Class<?> protocol, Object instance,
} }
try { try {
return createServer(addr, conf, secretManager, numHandlers, return createServer(pbProtocol, addr, conf, secretManager, numHandlers,
(BlockingService)method.invoke(null, service)); (BlockingService)method.invoke(null, service));
} catch (InvocationTargetException e) { } catch (InvocationTargetException e) {
throw new YarnException(e); throw new YarnException(e);
@ -148,13 +154,15 @@ private String getPackageName(Class<?> clazz) {
return clazz.getPackage().getName(); return clazz.getPackage().getName();
} }
private Server createServer(InetSocketAddress addr, Configuration conf, private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
BlockingService blockingService) throws IOException { BlockingService blockingService) throws IOException {
RPC.setProtocolEngine(conf, BlockingService.class, ProtoOverHadoopRpcEngine.class); RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
Server server = RPC.getServer(BlockingService.class, blockingService, RPC.Server server = RPC.getServer(pbProtocol, blockingService,
addr.getHostName(), addr.getPort(), numHandlers, false, conf, addr.getHostName(), addr.getPort(), numHandlers, false, conf,
secretManager); secretManager);
LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server");
server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
return server; return server;
} }
} }

View File

@ -1,404 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.ipc;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputOutputStream;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtocolMetaInfoPB;
import org.apache.hadoop.ipc.ProtocolProxy;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcEngine;
import org.apache.hadoop.ipc.ClientCache;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.ipc.RpcProtos.ProtoSpecificRpcRequest;
import org.apache.hadoop.yarn.ipc.RpcProtos.ProtoSpecificRpcResponse;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Evolving
public class ProtoOverHadoopRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(RPC.class);
private static final ClientCache CLIENTS=new ClientCache();
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(protocol
.getClassLoader(), new Class[] { protocol }, new Invoker(protocol,
addr, ticket, conf, factory, rpcTimeout)), false);
}
@Override
public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
Class<ProtocolMetaInfoPB> protocol = ProtocolMetaInfoPB.class;
return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
(ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, connId, conf,
factory)), false);
}
private static class Invoker implements InvocationHandler, Closeable {
private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();
private boolean isClosed = false;
private Client.ConnectionId remoteId;
private Client client;
public Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
int rpcTimeout) throws IOException {
this(protocol, Client.ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf), conf, factory);
}
public Invoker(Class<?> protocol, Client.ConnectionId connId,
Configuration conf, SocketFactory factory) {
this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory,
ProtoSpecificResponseWritable.class);
}
private ProtoSpecificRpcRequest constructRpcRequest(Method method,
Object[] params) throws ServiceException {
ProtoSpecificRpcRequest rpcRequest;
ProtoSpecificRpcRequest.Builder builder;
builder = ProtoSpecificRpcRequest.newBuilder();
builder.setMethodName(method.getName());
if (params.length != 2) { // RpcController + Message
throw new ServiceException("Too many parameters for request. Method: ["
+ method.getName() + "]" + ", Expected: 2, Actual: "
+ params.length);
}
if (params[1] == null) {
throw new ServiceException("null param while calling Method: ["
+ method.getName() + "]");
}
Message param = (Message) params[1];
builder.setRequestProto(param.toByteString());
rpcRequest = builder.build();
return rpcRequest;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
long startTime = 0;
if (LOG.isDebugEnabled()) {
startTime = System.currentTimeMillis();
}
ProtoSpecificRpcRequest rpcRequest = constructRpcRequest(method, args);
ProtoSpecificResponseWritable val = null;
try {
val = (ProtoSpecificResponseWritable) client.call(
new ProtoSpecificRequestWritable(rpcRequest), remoteId);
} catch (Exception e) {
throw new ServiceException(e);
}
ProtoSpecificRpcResponse response = val.message;
if (LOG.isDebugEnabled()) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
if (response.hasIsError() && response.getIsError() == true) {
YarnRemoteExceptionPBImpl exception = new YarnRemoteExceptionPBImpl(response.getException());
exception.fillInStackTrace();
ServiceException se = new ServiceException(exception);
throw se;
}
Message prototype = null;
try {
prototype = getReturnProtoType(method);
} catch (Exception e) {
throw new ServiceException(e);
}
Message actualReturnMessage = prototype.newBuilderForType()
.mergeFrom(response.getResponseProto()).build();
return actualReturnMessage;
}
@Override
public void close() throws IOException {
if (!isClosed) {
isClosed = true;
CLIENTS.stopClient(client);
}
}
private Message getReturnProtoType(Method method) throws Exception {
if (returnTypes.containsKey(method.getName())) {
return returnTypes.get(method.getName());
} else {
Class<?> returnType = method.getReturnType();
Method newInstMethod = returnType.getMethod("getDefaultInstance");
newInstMethod.setAccessible(true);
Message prototype = (Message) newInstMethod.invoke(null,
(Object[]) null);
returnTypes.put(method.getName(), prototype);
return prototype;
}
}
}
/**
* Writable Wrapper for Protocol Buffer Requests
*/
private static class ProtoSpecificRequestWritable implements Writable {
ProtoSpecificRpcRequest message;
@SuppressWarnings("unused")
public ProtoSpecificRequestWritable() {
}
ProtoSpecificRequestWritable(ProtoSpecificRpcRequest message) {
this.message = message;
}
@Override
public void write(DataOutput out) throws IOException {
((Message)message).writeDelimitedTo(
DataOutputOutputStream.constructOutputStream(out));
}
@Override
public void readFields(DataInput in) throws IOException {
int length = ProtoUtil.readRawVarint32(in);
byte[] bytes = new byte[length];
in.readFully(bytes);
message = ProtoSpecificRpcRequest.parseFrom(bytes);
}
}
/**
* Writable Wrapper for Protocol Buffer Responses
*/
public static class ProtoSpecificResponseWritable implements Writable {
ProtoSpecificRpcResponse message;
public ProtoSpecificResponseWritable() {
}
public ProtoSpecificResponseWritable(ProtoSpecificRpcResponse message) {
this.message = message;
}
@Override
public void write(DataOutput out) throws IOException {
((Message)message).writeDelimitedTo(
DataOutputOutputStream.constructOutputStream(out));
}
@Override
public void readFields(DataInput in) throws IOException {
int length = ProtoUtil.readRawVarint32(in);
byte[] bytes = new byte[length];
in.readFully(bytes);
message = ProtoSpecificRpcResponse.parseFrom(bytes);
}
}
@Override
public Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf)
throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
// for unit testing only
@InterfaceAudience.Private
@InterfaceStability.Unstable
static Client getClient(Configuration conf) {
return CLIENTS.getClient(conf, SocketFactory.getDefault(),
ProtoSpecificResponseWritable.class);
}
public static class Server extends RPC.Server {
private BlockingService service;
private boolean verbose;
//
// /**
// * Construct an RPC server.
// *
// * @param instance
// * the instance whose methods will be called
// * @param conf
// * the configuration to use
// * @param bindAddress
// * the address to bind on to listen for connection
// * @param port
// * the port to listen for connections on
// */
// public Server(Object instance, Configuration conf, String bindAddress,
// int port) throws IOException {
// this(instance, conf, bindAddress, port, 1, false, null);
// }
private static String classNameBase(String className) {
String[] names = className.split("\\.", -1);
if (names == null || names.length == 0) {
return className;
}
return names[names.length - 1];
}
/**
* Construct an RPC server.
*
* @param instance
* the instance whose methods will be called
* @param conf
* the configuration to use
* @param bindAddress
* the address to bind on to listen for connection
* @param port
* the port to listen for connections on
* @param numHandlers
* the number of method handler threads to run
* @param verbose
* whether each call should be logged
*/
public Server(Object instance, Configuration conf, String bindAddress,
int port, int numHandlers, int numReaders,
int queueSizePerHandler, boolean verbose,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
super(bindAddress, port, ProtoSpecificRequestWritable.class, numHandlers,
numReaders, queueSizePerHandler, conf, classNameBase(instance.getClass().getName()), secretManager);
this.service = (BlockingService) instance;
this.verbose = verbose;
}
@Override
public Writable call(RpcKind rpcKind, String protocol,
Writable writableRequest, long receiveTime) throws IOException {
ProtoSpecificRequestWritable request = (ProtoSpecificRequestWritable) writableRequest;
ProtoSpecificRpcRequest rpcRequest = request.message;
String methodName = rpcRequest.getMethodName();
if (verbose) {
log("Call: protocol=" + protocol + ", method="
+ methodName);
}
MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName);
if (methodDescriptor == null) {
String msg = "Unknown method " + methodName + " called on "
+ protocol + " protocol.";
LOG.warn(msg);
return handleException(new IOException(msg));
}
Message prototype = service.getRequestPrototype(methodDescriptor);
Message param = prototype.newBuilderForType()
.mergeFrom(rpcRequest.getRequestProto()).build();
Message result;
try {
result = service.callBlockingMethod(methodDescriptor, null, param);
} catch (ServiceException e) {
e.printStackTrace();
return handleException(e);
} catch (Exception e) {
return handleException(e);
}
ProtoSpecificRpcResponse response = constructProtoSpecificRpcSuccessResponse(result);
return new ProtoSpecificResponseWritable(response);
}
private ProtoSpecificResponseWritable handleException(Throwable e) {
ProtoSpecificRpcResponse.Builder builder = ProtoSpecificRpcResponse
.newBuilder();
builder.setIsError(true);
if (e.getCause() instanceof YarnRemoteExceptionPBImpl) {
builder.setException(((YarnRemoteExceptionPBImpl) e.getCause())
.getProto());
} else {
builder.setException(new YarnRemoteExceptionPBImpl(e).getProto());
}
ProtoSpecificRpcResponse response = builder.build();
return new ProtoSpecificResponseWritable(response);
}
private ProtoSpecificRpcResponse constructProtoSpecificRpcSuccessResponse(
Message message) {
ProtoSpecificRpcResponse res = ProtoSpecificRpcResponse.newBuilder()
.setResponseProto(message.toByteString()).build();
return res;
}
}
private static void log(String value) {
if (value != null && value.length() > 55)
value = value.substring(0, 55) + "...";
LOG.info(value);
}
@Override
public RPC.Server getServer(Class<?> protocol, Object instance,
String bindAddress, int port, int numHandlers,int numReaders,
int queueSizePerHandler, boolean verbose,
Configuration conf, SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
return new Server(instance, conf, bindAddress, port, numHandlers, numReaders, queueSizePerHandler,
verbose, secretManager);
}
}

View File

@ -26,7 +26,7 @@
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenInfo; import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.yarn.proto.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManagerPB;
public class ContainerManagerSecurityInfo extends SecurityInfo { public class ContainerManagerSecurityInfo extends SecurityInfo {
@ -38,7 +38,7 @@ public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
@Override @Override
public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) { public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
if (!protocol if (!protocol
.equals(ContainerManager.ContainerManagerService.BlockingInterface.class)) { .equals(ContainerManagerPB.class)) {
return null; return null;
} }
return new TokenInfo() { return new TokenInfo() {

View File

@ -26,7 +26,7 @@
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenInfo; import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.yarn.proto.AMRMProtocol; import org.apache.hadoop.yarn.api.AMRMProtocolPB;
public class SchedulerSecurityInfo extends SecurityInfo { public class SchedulerSecurityInfo extends SecurityInfo {
@ -37,7 +37,7 @@ public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
@Override @Override
public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) { public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
if (!protocol.equals(AMRMProtocol.AMRMProtocolService.BlockingInterface.class)) { if (!protocol.equals(AMRMProtocolPB.class)) {
return null; return null;
} }
return new TokenInfo() { return new TokenInfo() {

View File

@ -26,15 +26,15 @@
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenInfo; import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.yarn.api.ClientRMProtocolPB;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.ClientRMProtocol;
public class ClientRMSecurityInfo extends SecurityInfo { public class ClientRMSecurityInfo extends SecurityInfo {
@Override @Override
public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) { public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
if (!protocol if (!protocol
.equals(ClientRMProtocol.ClientRMProtocolService.BlockingInterface.class)) { .equals(ClientRMProtocolPB.class)) {
return null; return null;
} }
return new KerberosInfo() { return new KerberosInfo() {
@ -59,7 +59,7 @@ public String clientPrincipal() {
@Override @Override
public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) { public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
if (!protocol if (!protocol
.equals(ClientRMProtocol.ClientRMProtocolService.BlockingInterface.class)) { .equals(ClientRMProtocolPB.class)) {
return null; return null;
} }
return new TokenInfo() { return new TokenInfo() {

View File

@ -22,11 +22,14 @@
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.ContainerManagerPB;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
@ -84,6 +87,8 @@ public void testUnknownCall() {
"Unknown method getNewApplication called on.*" "Unknown method getNewApplication called on.*"
+ "org.apache.hadoop.yarn.proto.ClientRMProtocol" + "org.apache.hadoop.yarn.proto.ClientRMProtocol"
+ "\\$ClientRMProtocolService\\$BlockingInterface protocol.")); + "\\$ClientRMProtocolService\\$BlockingInterface protocol."));
} catch (Exception e) {
e.printStackTrace();
} }
} }
@ -101,6 +106,7 @@ private void test(String rpcClass) throws Exception {
Server server = rpc.getServer(ContainerManager.class, Server server = rpc.getServer(ContainerManager.class,
new DummyContainerManager(), addr, conf, null, 1); new DummyContainerManager(), addr, conf, null, 1);
server.start(); server.start();
RPC.setProtocolEngine(conf, ContainerManagerPB.class, ProtobufRpcEngine.class);
ContainerManager proxy = (ContainerManager) ContainerManager proxy = (ContainerManager)
rpc.getProxy(ContainerManager.class, rpc.getProxy(ContainerManager.class,
NetUtils.createSocketAddr("localhost:" + server.getPort()), conf); NetUtils.createSocketAddr("localhost:" + server.getPort()), conf);
@ -144,11 +150,11 @@ private void test(String rpcClass) throws Exception {
proxy.stopContainer(stopRequest); proxy.stopContainer(stopRequest);
} catch (YarnRemoteException e) { } catch (YarnRemoteException e) {
exception = true; exception = true;
System.err.println(e.getMessage()); Assert.assertTrue(e.getMessage().contains(EXCEPTION_MSG));
System.err.println(e.getCause().getMessage()); Assert.assertTrue(e.getMessage().contains(EXCEPTION_CAUSE));
Assert.assertTrue(EXCEPTION_MSG.equals(e.getMessage()));
Assert.assertTrue(EXCEPTION_CAUSE.equals(e.getCause().getMessage()));
System.out.println("Test Exception is " + RPCUtil.toString(e)); System.out.println("Test Exception is " + RPCUtil.toString(e));
} catch (Exception ex) {
ex.printStackTrace();
} }
Assert.assertTrue(exception); Assert.assertTrue(exception);

View File

@ -25,13 +25,13 @@
import org.apache.hadoop.security.SecurityInfo; import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.token.TokenInfo; import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.ResourceTracker; import org.apache.hadoop.yarn.server.api.ResourceTrackerPB;
public class RMNMSecurityInfoClass extends SecurityInfo { public class RMNMSecurityInfoClass extends SecurityInfo {
@Override @Override
public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) { public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
if (!protocol.equals(ResourceTracker.ResourceTrackerService.BlockingInterface.class)) { if (!protocol.equals(ResourceTrackerPB.class)) {
return null; return null;
} }
return new KerberosInfo() { return new KerberosInfo() {

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.yarn.proto.ResourceTracker.ResourceTrackerService;
@ProtocolInfo(
protocolName = "org.apache.hadoop.yarn.server.api.ResourceTrackerPB",
protocolVersion = 1)
public interface ResourceTrackerPB extends ResourceTrackerService.BlockingInterface {
}

View File

@ -19,17 +19,17 @@
package org.apache.hadoop.yarn.server.api.impl.pb.client; package org.apache.hadoop.yarn.server.api.impl.pb.client;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine; import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.proto.ResourceTracker.ResourceTrackerService;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ResourceTrackerPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@ -43,12 +43,12 @@
public class ResourceTrackerPBClientImpl implements ResourceTracker { public class ResourceTrackerPBClientImpl implements ResourceTracker {
private ResourceTrackerService.BlockingInterface proxy; private ResourceTrackerPB proxy;
public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException { public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ResourceTrackerService.BlockingInterface.class, ProtoOverHadoopRpcEngine.class); RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);
proxy = (ResourceTrackerService.BlockingInterface)RPC.getProxy( proxy = (ResourceTrackerPB)RPC.getProxy(
ResourceTrackerService.BlockingInterface.class, clientVersion, addr, conf); ResourceTrackerPB.class, clientVersion, addr, conf);
} }
@Override @Override
@ -58,13 +58,7 @@ public RegisterNodeManagerResponse registerNodeManager(
try { try {
return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager(null, requestProto)); return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -75,13 +69,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
try { try {
return new NodeHeartbeatResponsePBImpl(proxy.nodeHeartbeat(null, requestProto)); return new NodeHeartbeatResponsePBImpl(proxy.nodeHeartbeat(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }

View File

@ -19,12 +19,12 @@
package org.apache.hadoop.yarn.server.api.impl.pb.service; package org.apache.hadoop.yarn.server.api.impl.pb.service;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.proto.ResourceTracker.ResourceTrackerService.BlockingInterface;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto;
import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ResourceTrackerPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
@ -35,7 +35,7 @@
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
public class ResourceTrackerPBServiceImpl implements BlockingInterface { public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {
private ResourceTracker real; private ResourceTracker real;

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.api;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.yarn.proto.LocalizationProtocol.LocalizationProtocolService;
@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocolPB",
protocolVersion = 1)
public interface LocalizationProtocolPB extends LocalizationProtocolService.BlockingInterface {
}

View File

@ -18,32 +18,31 @@
package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.client; package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.client;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine; import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.proto.LocalizationProtocol.LocalizationProtocolService; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerStatusProto;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocolPB;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalizerHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalizerHeartbeatResponsePBImpl;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalizerStatusPBImpl; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalizerStatusPBImpl;
import static org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerStatusProto;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
public class LocalizationProtocolPBClientImpl implements LocalizationProtocol { public class LocalizationProtocolPBClientImpl implements LocalizationProtocol {
private LocalizationProtocolService.BlockingInterface proxy; private LocalizationProtocolPB proxy;
public LocalizationProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException { public LocalizationProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, LocalizationProtocolService.BlockingInterface.class, ProtoOverHadoopRpcEngine.class); RPC.setProtocolEngine(conf, LocalizationProtocolPB.class, ProtobufRpcEngine.class);
proxy = (LocalizationProtocolService.BlockingInterface)RPC.getProxy( proxy = (LocalizationProtocolPB)RPC.getProxy(
LocalizationProtocolService.BlockingInterface.class, clientVersion, addr, conf); LocalizationProtocolPB.class, clientVersion, addr, conf);
} }
@Override @Override
@ -54,13 +53,7 @@ public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status)
return new LocalizerHeartbeatResponsePBImpl( return new LocalizerHeartbeatResponsePBImpl(
proxy.heartbeat(null, statusProto)); proxy.heartbeat(null, statusProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }

View File

@ -24,13 +24,13 @@
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.proto.LocalizationProtocol.LocalizationProtocolService.BlockingInterface;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerStatusProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerStatusProto;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocolPB;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
public class LocalizationProtocolPBServiceImpl implements BlockingInterface { public class LocalizationProtocolPBServiceImpl implements LocalizationProtocolPB {
private LocalizationProtocol real; private LocalizationProtocol real;

View File

@ -26,7 +26,7 @@
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenInfo; import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.yarn.proto.LocalizationProtocol; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocolPB;
public class LocalizerSecurityInfo extends SecurityInfo { public class LocalizerSecurityInfo extends SecurityInfo {
@ -38,7 +38,7 @@ public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
@Override @Override
public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) { public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
if (!protocol if (!protocol
.equals(LocalizationProtocol.LocalizationProtocolService.BlockingInterface.class)) { .equals(LocalizationProtocolPB.class)) {
return null; return null;
} }
return new TokenInfo() { return new TokenInfo() {

View File

@ -21,9 +21,9 @@
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.yarn.proto.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManagerPB;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.LocalizationProtocol; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocolPB;
/** /**
* {@link PolicyProvider} for YARN NodeManager protocols. * {@link PolicyProvider} for YARN NodeManager protocols.
@ -36,9 +36,9 @@ public class NMPolicyProvider extends PolicyProvider {
new Service[] { new Service[] {
new Service( new Service(
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGER, YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGER,
ContainerManager.ContainerManagerService.BlockingInterface.class), ContainerManagerPB.class),
new Service(YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER, new Service(YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER,
LocalizationProtocol.LocalizationProtocolService.BlockingInterface.class) LocalizationProtocolPB.class)
}; };
@Override @Override

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.api;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.yarn.proto.RMAdminProtocol.RMAdminProtocolService;
@ProtocolInfo(
protocolName = "org.apache.hadoop.yarn.server.nodemanager.api.RMAdminProtocolPB",
protocolVersion = 1)
public interface RMAdminProtocolPB extends RMAdminProtocolService.BlockingInterface {
}

View File

@ -23,10 +23,10 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine; import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.proto.RMAdminProtocol.RMAdminProtocolService;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto;
@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProto;
import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocol; import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocol;
import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocolPB;
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshNodesRequest; import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshNodesRequest;
@ -64,14 +65,14 @@
public class RMAdminProtocolPBClientImpl implements RMAdminProtocol { public class RMAdminProtocolPBClientImpl implements RMAdminProtocol {
private RMAdminProtocolService.BlockingInterface proxy; private RMAdminProtocolPB proxy;
public RMAdminProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, public RMAdminProtocolPBClientImpl(long clientVersion, InetSocketAddress addr,
Configuration conf) throws IOException { Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, RMAdminProtocolService.BlockingInterface.class, RPC.setProtocolEngine(conf, RMAdminProtocolPB.class,
ProtoOverHadoopRpcEngine.class); ProtobufRpcEngine.class);
proxy = (RMAdminProtocolService.BlockingInterface)RPC.getProxy( proxy = (RMAdminProtocolPB)RPC.getProxy(
RMAdminProtocolService.BlockingInterface.class, clientVersion, addr, conf); RMAdminProtocolPB.class, clientVersion, addr, conf);
} }
@Override @Override
@ -83,13 +84,7 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
return new RefreshQueuesResponsePBImpl( return new RefreshQueuesResponsePBImpl(
proxy.refreshQueues(null, requestProto)); proxy.refreshQueues(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -102,13 +97,7 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
return new RefreshNodesResponsePBImpl( return new RefreshNodesResponsePBImpl(
proxy.refreshNodes(null, requestProto)); proxy.refreshNodes(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -122,13 +111,7 @@ public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfigu
return new RefreshSuperUserGroupsConfigurationResponsePBImpl( return new RefreshSuperUserGroupsConfigurationResponsePBImpl(
proxy.refreshSuperUserGroupsConfiguration(null, requestProto)); proxy.refreshSuperUserGroupsConfiguration(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -141,13 +124,7 @@ public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
return new RefreshUserToGroupsMappingsResponsePBImpl( return new RefreshUserToGroupsMappingsResponsePBImpl(
proxy.refreshUserToGroupsMappings(null, requestProto)); proxy.refreshUserToGroupsMappings(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -160,13 +137,7 @@ public RefreshAdminAclsResponse refreshAdminAcls(
return new RefreshAdminAclsResponsePBImpl( return new RefreshAdminAclsResponsePBImpl(
proxy.refreshAdminAcls(null, requestProto)); proxy.refreshAdminAcls(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }
@ -176,16 +147,10 @@ public RefreshServiceAclsResponse refreshServiceAcls(
RefreshServiceAclsRequestProto requestProto = RefreshServiceAclsRequestProto requestProto =
((RefreshServiceAclsRequestPBImpl)request).getProto(); ((RefreshServiceAclsRequestPBImpl)request).getProto();
try { try {
return new RefreshServiceAclsResponsePBImpl( return new RefreshServiceAclsResponsePBImpl(proxy.refreshServiceAcls(
proxy.refreshServiceAcls(null, requestProto)); null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
throw (YarnRemoteException)e.getCause();
} else if (e.getCause() instanceof UndeclaredThrowableException) {
throw (UndeclaredThrowableException)e.getCause();
} else {
throw new UndeclaredThrowableException(e);
}
} }
} }

View File

@ -19,11 +19,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.api.impl.pb.service; package org.apache.hadoop.yarn.server.resourcemanager.api.impl.pb.service;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.proto.RMAdminProtocol.RMAdminProtocolService.BlockingInterface;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.*; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.*;
import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocol; import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocol;
import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocolPB;
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshNodesResponse; import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshNodesResponse;
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshQueuesResponse; import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshQueuesResponse;
@ -46,7 +46,7 @@
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
public class RMAdminProtocolPBServiceImpl implements BlockingInterface { public class RMAdminProtocolPBServiceImpl implements RMAdminProtocolPB {
private RMAdminProtocol real; private RMAdminProtocol real;

View File

@ -25,13 +25,13 @@
import org.apache.hadoop.security.SecurityInfo; import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.token.TokenInfo; import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.RMAdminProtocol; import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocolPB;
public class AdminSecurityInfo extends SecurityInfo { public class AdminSecurityInfo extends SecurityInfo {
@Override @Override
public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) { public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
if (!protocol.equals(RMAdminProtocol.RMAdminProtocolService.BlockingInterface.class)) { if (!protocol.equals(RMAdminProtocolPB.class)) {
return null; return null;
} }
return new KerberosInfo() { return new KerberosInfo() {

View File

@ -21,12 +21,12 @@
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.yarn.api.AMRMProtocolPB;
import org.apache.hadoop.yarn.api.ClientRMProtocolPB;
import org.apache.hadoop.yarn.api.ContainerManagerPB;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.ContainerManager; import org.apache.hadoop.yarn.server.api.ResourceTrackerPB;
import org.apache.hadoop.yarn.proto.ResourceTracker; import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocolPB;
import org.apache.hadoop.yarn.proto.RMAdminProtocol;
import org.apache.hadoop.yarn.proto.ClientRMProtocol;
import org.apache.hadoop.yarn.proto.AMRMProtocol;
/** /**
* {@link PolicyProvider} for YARN ResourceManager protocols. * {@link PolicyProvider} for YARN ResourceManager protocols.
@ -39,19 +39,19 @@ public class RMPolicyProvider extends PolicyProvider {
new Service[] { new Service[] {
new Service( new Service(
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCETRACKER, YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCETRACKER,
ResourceTracker.ResourceTrackerService.BlockingInterface.class), ResourceTrackerPB.class),
new Service( new Service(
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CLIENT_RESOURCEMANAGER, YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CLIENT_RESOURCEMANAGER,
ClientRMProtocol.ClientRMProtocolService.BlockingInterface.class), ClientRMProtocolPB.class),
new Service( new Service(
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_APPLICATIONMASTER_RESOURCEMANAGER, YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_APPLICATIONMASTER_RESOURCEMANAGER,
AMRMProtocol.AMRMProtocolService.BlockingInterface.class), AMRMProtocolPB.class),
new Service( new Service(
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_ADMIN, YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_ADMIN,
RMAdminProtocol.RMAdminProtocolService.BlockingInterface.class), RMAdminProtocolPB.class),
new Service( new Service(
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGER, YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGER,
ContainerManager.ContainerManagerService.BlockingInterface.class), ContainerManagerPB.class),
}; };
@Override @Override

View File

@ -235,10 +235,11 @@ public AMRMProtocol run() {
client.registerApplicationMaster(request); client.registerApplicationMaster(request);
Assert.fail("Should fail with authorization error"); Assert.fail("Should fail with authorization error");
} catch (YarnRemoteException e) { } catch (YarnRemoteException e) {
Assert.assertEquals("Unauthorized request from ApplicationMaster. " Assert.assertTrue(e.getMessage().contains(
"Unauthorized request from ApplicationMaster. "
+ "Expected ApplicationAttemptID: " + "Expected ApplicationAttemptID: "
+ applicationAttemptId.toString() + " Found: " + applicationAttemptId.toString() + " Found: "
+ otherAppAttemptId.toString(), e.getMessage()); + otherAppAttemptId.toString()));
} finally { } finally {
rm.stop(); rm.stop();
} }

View File

@ -323,8 +323,10 @@ private void verifyEnemyAccess() throws Exception {
Assert.fail("App killing by the enemy should fail!!"); Assert.fail("App killing by the enemy should fail!!");
} catch (YarnRemoteException e) { } catch (YarnRemoteException e) {
LOG.info("Got exception while killing app as the enemy", e); LOG.info("Got exception while killing app as the enemy", e);
Assert.assertEquals("User enemy cannot perform operation MODIFY_APP on " Assert
+ applicationId, e.getMessage()); .assertTrue(e.getMessage().contains(
"User enemy cannot perform operation MODIFY_APP on "
+ applicationId));
} }
rmClient.forceKillApplication(finishAppRequest); rmClient.forceKillApplication(finishAppRequest);

View File

@ -247,10 +247,12 @@ public Void run() {
Assert.assertEquals( Assert.assertEquals(
java.lang.reflect.UndeclaredThrowableException.class java.lang.reflect.UndeclaredThrowableException.class
.getCanonicalName(), e.getClass().getCanonicalName()); .getCanonicalName(), e.getClass().getCanonicalName());
Assert.assertEquals( Assert.assertTrue(e
.getCause()
.getMessage()
.contains(
"DIGEST-MD5: digest response format violation. " "DIGEST-MD5: digest response format violation. "
+ "Mismatched response.", e.getCause().getCause() + "Mismatched response."));
.getMessage());
} }
return null; return null;
} }
@ -468,9 +470,10 @@ void callWithIllegalContainerID(ContainerManager client,
+ "access is expected to fail."); + "access is expected to fail.");
} catch (YarnRemoteException e) { } catch (YarnRemoteException e) {
LOG.info("Got exception : ", e); LOG.info("Got exception : ", e);
Assert.assertEquals("Unauthorized request to start container. " Assert.assertTrue(e.getMessage().contains(
"Unauthorized request to start container. "
+ "\nExpected containerId: " + tokenId.getContainerID() + "\nExpected containerId: " + tokenId.getContainerID()
+ " Found: " + newContainerId.toString(), e.getMessage()); + " Found: " + newContainerId.toString()));
} }
} }