YARN-5162. Fix Exceptions thrown during in registerAM call when Distributed Scheduling is Enabled (Hitesh Sharma via asuresh)
(cherry picked from commit 5b41b288d0
)
This commit is contained in:
parent
b4c8729cf9
commit
44cbf5b7f5
|
@ -41,7 +41,7 @@ public class SchedulerSecurityInfo extends SecurityInfo {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
|
public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
|
||||||
if (!protocol.equals(ApplicationMasterProtocolPB.class)) {
|
if (!ApplicationMasterProtocolPB.class.isAssignableFrom(protocol)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return new TokenInfo() {
|
return new TokenInfo() {
|
||||||
|
|
|
@ -21,16 +21,16 @@ package org.apache.hadoop.yarn.server.api;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.ipc.ProtocolInfo;
|
import org.apache.hadoop.ipc.ProtocolInfo;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
|
||||||
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
|
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
|
||||||
|
import org.apache.hadoop.yarn.proto.DistributedSchedulerProtocol.DistributedSchedulerProtocolService;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.proto.DistributedSchedulerProtocol;
|
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB",
|
@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB",
|
||||||
protocolVersion = 1)
|
protocolVersion = 1)
|
||||||
public interface DistributedSchedulerProtocolPB extends
|
public interface DistributedSchedulerProtocolPB extends
|
||||||
DistributedSchedulerProtocol.DistributedSchedulerProtocolService.BlockingInterface,
|
DistributedSchedulerProtocolService.BlockingInterface,
|
||||||
ApplicationMasterProtocolService.BlockingInterface {
|
ApplicationMasterProtocolService.BlockingInterface,
|
||||||
|
ApplicationMasterProtocolPB {
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,9 +94,13 @@ public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse {
|
||||||
ProtoUtils.convertToProtoFormat(this.maxAllocatableCapability));
|
ProtoUtils.convertToProtoFormat(this.maxAllocatableCapability));
|
||||||
}
|
}
|
||||||
if (this.minAllocatableCapability != null) {
|
if (this.minAllocatableCapability != null) {
|
||||||
builder.setMaxAllocCapability(
|
builder.setMinAllocCapability(
|
||||||
ProtoUtils.convertToProtoFormat(this.minAllocatableCapability));
|
ProtoUtils.convertToProtoFormat(this.minAllocatableCapability));
|
||||||
}
|
}
|
||||||
|
if (this.incrAllocatableCapability != null) {
|
||||||
|
builder.setIncrAllocCapability(
|
||||||
|
ProtoUtils.convertToProtoFormat(this.incrAllocatableCapability));
|
||||||
|
}
|
||||||
if (this.registerApplicationMasterResponse != null) {
|
if (this.registerApplicationMasterResponse != null) {
|
||||||
builder.setRegisterResponse(
|
builder.setRegisterResponse(
|
||||||
((RegisterApplicationMasterResponsePBImpl)
|
((RegisterApplicationMasterResponsePBImpl)
|
||||||
|
|
|
@ -34,5 +34,6 @@ import "yarn_server_common_service_protos.proto";
|
||||||
|
|
||||||
service DistributedSchedulerProtocolService {
|
service DistributedSchedulerProtocolService {
|
||||||
rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto);
|
rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto);
|
||||||
|
rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);
|
||||||
rpc allocateForDistributedScheduling (AllocateRequestProto) returns (DistSchedAllocateResponseProto);
|
rpc allocateForDistributedScheduling (AllocateRequestProto) returns (DistSchedAllocateResponseProto);
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
|
||||||
.RegisterApplicationMasterRequestPBImpl;
|
.RegisterApplicationMasterRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
|
||||||
.RegisterApplicationMasterResponsePBImpl;
|
.RegisterApplicationMasterResponsePBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
|
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
@ -132,6 +133,9 @@ public class TestDistributedSchedulingService {
|
||||||
DistSchedRegisterResponse resp = factory.newRecordInstance(
|
DistSchedRegisterResponse resp = factory.newRecordInstance(
|
||||||
DistSchedRegisterResponse.class);
|
DistSchedRegisterResponse.class);
|
||||||
resp.setContainerIdStart(54321l);
|
resp.setContainerIdStart(54321l);
|
||||||
|
resp.setMaxAllocatableCapabilty(Resource.newInstance(4096, 4));
|
||||||
|
resp.setMinAllocatableCapabilty(Resource.newInstance(1024, 1));
|
||||||
|
resp.setIncrAllocatableCapabilty(Resource.newInstance(2048, 2));
|
||||||
return resp;
|
return resp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -194,6 +198,13 @@ public class TestDistributedSchedulingService {
|
||||||
.newRecordInstance(RegisterApplicationMasterRequest.class))
|
.newRecordInstance(RegisterApplicationMasterRequest.class))
|
||||||
.getProto()));
|
.getProto()));
|
||||||
Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
|
Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
|
||||||
|
Assert.assertEquals(4,
|
||||||
|
dsRegResp.getMaxAllocatableCapabilty().getVirtualCores());
|
||||||
|
Assert.assertEquals(1024,
|
||||||
|
dsRegResp.getMinAllocatableCapabilty().getMemory());
|
||||||
|
Assert.assertEquals(2,
|
||||||
|
dsRegResp.getIncrAllocatableCapabilty().getVirtualCores());
|
||||||
|
|
||||||
DistSchedAllocateResponse dsAllocResp =
|
DistSchedAllocateResponse dsAllocResp =
|
||||||
new DistSchedAllocateResponsePBImpl(
|
new DistSchedAllocateResponsePBImpl(
|
||||||
dsProxy.allocateForDistributedScheduling(null,
|
dsProxy.allocateForDistributedScheduling(null,
|
||||||
|
@ -201,5 +212,14 @@ public class TestDistributedSchedulingService {
|
||||||
.newRecordInstance(AllocateRequest.class)).getProto()));
|
.newRecordInstance(AllocateRequest.class)).getProto()));
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
"h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
|
"h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
|
||||||
|
|
||||||
|
FinishApplicationMasterResponse dsfinishResp =
|
||||||
|
new FinishApplicationMasterResponsePBImpl(
|
||||||
|
dsProxy.finishApplicationMaster(null,
|
||||||
|
((FinishApplicationMasterRequestPBImpl) factory
|
||||||
|
.newRecordInstance(FinishApplicationMasterRequest.class))
|
||||||
|
.getProto()));
|
||||||
|
Assert.assertEquals(
|
||||||
|
false, dsfinishResp.getIsUnregistered());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue