YARN-2885. Create AMRMProxy request interceptor and ContainerAllocator to distribute OPPORTUNISTIC containers to appropriate Nodes (asuresh)
(cherry picked from commit 2bf025278a318b0452fdc9ece4427b4c42124e39)
(cherry picked from commit c282a08f38
)
This commit is contained in:
parent
b56fc51b70
commit
d80d24aabf
|
@ -296,6 +296,48 @@ public class YarnConfiguration extends Configuration {
|
|||
/** ACL used in case none is found. Allows nothing. */
|
||||
public static final String DEFAULT_YARN_APP_ACL = " ";
|
||||
|
||||
/** Is Distributed Scheduling Enabled. */
|
||||
public static final String DIST_SCHEDULING_ENABLED =
|
||||
YARN_PREFIX + "distributed-scheduling.enabled";
|
||||
public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false;
|
||||
|
||||
/** Mininum allocatable container memory for Distributed Scheduling. */
|
||||
public static final String DIST_SCHEDULING_MIN_MEMORY =
|
||||
YARN_PREFIX + "distributed-scheduling.min-memory";
|
||||
public static final int DIST_SCHEDULING_MIN_MEMORY_DEFAULT = 512;
|
||||
|
||||
/** Mininum allocatable container vcores for Distributed Scheduling. */
|
||||
public static final String DIST_SCHEDULING_MIN_VCORES =
|
||||
YARN_PREFIX + "distributed-scheduling.min-vcores";
|
||||
public static final int DIST_SCHEDULING_MIN_VCORES_DEFAULT = 1;
|
||||
|
||||
/** Maximum allocatable container memory for Distributed Scheduling. */
|
||||
public static final String DIST_SCHEDULING_MAX_MEMORY =
|
||||
YARN_PREFIX + "distributed-scheduling.max-memory";
|
||||
public static final int DIST_SCHEDULING_MAX_MEMORY_DEFAULT = 2048;
|
||||
|
||||
/** Maximum allocatable container vcores for Distributed Scheduling. */
|
||||
public static final String DIST_SCHEDULING_MAX_VCORES =
|
||||
YARN_PREFIX + "distributed-scheduling.max-vcores";
|
||||
public static final int DIST_SCHEDULING_MAX_VCORES_DEFAULT = 4;
|
||||
|
||||
/** Incremental allocatable container memory for Distributed Scheduling. */
|
||||
public static final String DIST_SCHEDULING_INCR_MEMORY =
|
||||
YARN_PREFIX + "distributed-scheduling.incr-memory";
|
||||
public static final int DIST_SCHEDULING_INCR_MEMORY_DEFAULT = 512;
|
||||
|
||||
/** Incremental allocatable container vcores for Distributed Scheduling. */
|
||||
public static final String DIST_SCHEDULING_INCR_VCORES =
|
||||
YARN_PREFIX + "distributed-scheduling.incr-vcores";
|
||||
public static final int DIST_SCHEDULING_INCR_VCORES_DEFAULT = 1;
|
||||
|
||||
/** Container token expiry for container allocated via Distributed
|
||||
* Scheduling. */
|
||||
public static final String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS =
|
||||
YARN_PREFIX + "distributed-scheduling.container-token-expiry";
|
||||
public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
|
||||
600000;
|
||||
|
||||
/**
|
||||
* Enable/disable intermediate-data encryption at YARN level. For now, this
|
||||
* only is used by the FileSystemRMStateStore to setup right file-system
|
||||
|
|
|
@ -38,8 +38,10 @@ import org.apache.hadoop.yarn.api.records.NodeState;
|
|||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
|
||||
|
@ -309,4 +311,17 @@ public class ProtoUtils {
|
|||
public static ExecutionType convertFromProtoFormat(ExecutionTypeProto e) {
|
||||
return ExecutionType.valueOf(e.name());
|
||||
}
|
||||
|
||||
/*
|
||||
* Resource
|
||||
*/
|
||||
public static synchronized YarnProtos.ResourceProto convertToProtoFormat(
|
||||
Resource r) {
|
||||
return ((ResourcePBImpl) r).getProto();
|
||||
}
|
||||
|
||||
public static Resource convertFromProtoFormat(
|
||||
YarnProtos.ResourceProto resource) {
|
||||
return new ResourcePBImpl(resource);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -142,6 +142,7 @@
|
|||
<source>
|
||||
<directory>${basedir}/src/main/proto</directory>
|
||||
<includes>
|
||||
<include>distributed_scheduler_protocol.proto</include>
|
||||
<include>yarn_server_common_protos.proto</include>
|
||||
<include>yarn_server_common_service_protos.proto</include>
|
||||
<include>yarn_server_common_service_protos.proto</include>
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.io.retry.Idempotent;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* <p>This protocol extends the <code>ApplicationMasterProtocol</code>. It is
|
||||
* used by the <code>LocalScheduler</code> running on the NodeManager to wrap
|
||||
* the request / response objects of the <code>registerApplicationMaster</code>
|
||||
* and <code>allocate</code> methods of the protocol with addition information
|
||||
* required to perform Distributed Scheduling.
|
||||
* </p>
|
||||
*/
|
||||
public interface DistributedSchedulerProtocol
|
||||
extends ApplicationMasterProtocol {
|
||||
|
||||
/**
|
||||
* <p> Extends the <code>registerApplicationMaster</code> to wrap the response
|
||||
* with additional metadata.</p>
|
||||
*
|
||||
* @param request ApplicationMaster registration request
|
||||
* @return A <code>DistSchedRegisterResponse</code> that contains a standard
|
||||
* AM registration response along with additional information required
|
||||
* for Distributed Scheduling
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
@Idempotent
|
||||
DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling(
|
||||
RegisterApplicationMasterRequest request)
|
||||
throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
* <p> Extends the <code>allocate</code> to wrap the response with additional
|
||||
* metadata.</p>
|
||||
*
|
||||
* @param request ApplicationMaster allocate request
|
||||
* @return A <code>DistSchedAllocateResponse</code> that contains a standard
|
||||
* AM allocate response along with additional information required
|
||||
* for Distributed Scheduling
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
@Idempotent
|
||||
DistSchedAllocateResponse allocateForDistributedScheduling(
|
||||
AllocateRequest request) throws YarnException, IOException;
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.ipc.ProtocolInfo;
|
||||
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.proto.DistributedSchedulerProtocol;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB",
|
||||
protocolVersion = 1)
|
||||
public interface DistributedSchedulerProtocolPB extends
|
||||
DistributedSchedulerProtocol.DistributedSchedulerProtocolService.BlockingInterface,
|
||||
ApplicationMasterProtocolService.BlockingInterface {
|
||||
}
|
|
@ -78,6 +78,10 @@ public class ServerRMProxy<T> extends RMProxy<T> {
|
|||
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
|
||||
} else if (protocol == DistributedSchedulerProtocol.class) {
|
||||
return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||
} else {
|
||||
String message = "Unsupported protocol found when creating the proxy " +
|
||||
"connection to ResourceManager: " +
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.impl.pb.client;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
|
||||
.FinishApplicationMasterRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
|
||||
.FinishApplicationMasterResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
|
||||
.RegisterApplicationMasterRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
|
||||
.RegisterApplicationMasterResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
|
||||
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
public class DistributedSchedulerProtocolPBClientImpl implements
|
||||
DistributedSchedulerProtocol, Closeable {
|
||||
|
||||
private DistributedSchedulerProtocolPB proxy;
|
||||
|
||||
public DistributedSchedulerProtocolPBClientImpl(long clientVersion,
|
||||
InetSocketAddress addr,
|
||||
Configuration conf) throws IOException {
|
||||
RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
proxy = RPC.getProxy(DistributedSchedulerProtocolPB.class, clientVersion,
|
||||
addr, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (this.proxy != null) {
|
||||
RPC.stopProxy(this.proxy);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DistSchedRegisterResponse
|
||||
registerApplicationMasterForDistributedScheduling
|
||||
(RegisterApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
|
||||
((RegisterApplicationMasterRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new DistSchedRegisterResponsePBImpl(
|
||||
proxy.registerApplicationMasterForDistributedScheduling(
|
||||
null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DistSchedAllocateResponse allocateForDistributedScheduling
|
||||
(AllocateRequest request) throws YarnException, IOException {
|
||||
YarnServiceProtos.AllocateRequestProto requestProto =
|
||||
((AllocateRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new DistSchedAllocateResponsePBImpl(
|
||||
proxy.allocateForDistributedScheduling(null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster
|
||||
(RegisterApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
|
||||
((RegisterApplicationMasterRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new RegisterApplicationMasterResponsePBImpl(
|
||||
proxy.registerApplicationMaster(null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public FinishApplicationMasterResponse finishApplicationMaster
|
||||
(FinishApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
YarnServiceProtos.FinishApplicationMasterRequestProto requestProto =
|
||||
((FinishApplicationMasterRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new FinishApplicationMasterResponsePBImpl(
|
||||
proxy.finishApplicationMaster(null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest request) throws
|
||||
YarnException, IOException {
|
||||
YarnServiceProtos.AllocateRequestProto requestProto =
|
||||
((AllocateRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new AllocateResponsePBImpl(proxy.allocate(null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,143 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.impl.pb.service;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
|
||||
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords
|
||||
.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
|
||||
.FinishApplicationMasterRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
|
||||
.FinishApplicationMasterResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class DistributedSchedulerProtocolPBServiceImpl implements
|
||||
DistributedSchedulerProtocolPB {
|
||||
|
||||
private DistributedSchedulerProtocol real;
|
||||
|
||||
public DistributedSchedulerProtocolPBServiceImpl(
|
||||
DistributedSchedulerProtocol impl) {
|
||||
this.real = impl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto
|
||||
registerApplicationMasterForDistributedScheduling(RpcController controller,
|
||||
RegisterApplicationMasterRequestProto proto) throws
|
||||
ServiceException {
|
||||
RegisterApplicationMasterRequestPBImpl request = new
|
||||
RegisterApplicationMasterRequestPBImpl(proto);
|
||||
try {
|
||||
DistSchedRegisterResponse response =
|
||||
real.registerApplicationMasterForDistributedScheduling(request);
|
||||
return ((DistSchedRegisterResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto
|
||||
allocateForDistributedScheduling(RpcController controller,
|
||||
AllocateRequestProto proto) throws ServiceException {
|
||||
AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto);
|
||||
try {
|
||||
DistSchedAllocateResponse response = real
|
||||
.allocateForDistributedScheduling(request);
|
||||
return ((DistSchedAllocateResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public YarnServiceProtos.AllocateResponseProto allocate(RpcController arg0,
|
||||
AllocateRequestProto proto) throws ServiceException {
|
||||
AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto);
|
||||
try {
|
||||
AllocateResponse response = real.allocate(request);
|
||||
return ((AllocateResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public YarnServiceProtos.FinishApplicationMasterResponseProto
|
||||
finishApplicationMaster(
|
||||
RpcController arg0, YarnServiceProtos
|
||||
.FinishApplicationMasterRequestProto proto)
|
||||
throws ServiceException {
|
||||
FinishApplicationMasterRequestPBImpl request = new
|
||||
FinishApplicationMasterRequestPBImpl(proto);
|
||||
try {
|
||||
FinishApplicationMasterResponse response = real.finishApplicationMaster
|
||||
(request);
|
||||
return ((FinishApplicationMasterResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public YarnServiceProtos.RegisterApplicationMasterResponseProto
|
||||
registerApplicationMaster(
|
||||
RpcController arg0, RegisterApplicationMasterRequestProto proto)
|
||||
throws ServiceException {
|
||||
RegisterApplicationMasterRequestPBImpl request = new
|
||||
RegisterApplicationMasterRequestPBImpl(proto);
|
||||
try {
|
||||
RegisterApplicationMasterResponse response = real
|
||||
.registerApplicationMaster(request);
|
||||
return ((RegisterApplicationMasterResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract class DistSchedAllocateResponse {
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public static DistSchedAllocateResponse newInstance(AllocateResponse
|
||||
allResp) {
|
||||
DistSchedAllocateResponse response =
|
||||
Records.newRecord(DistSchedAllocateResponse.class);
|
||||
response.setAllocateResponse(allResp);
|
||||
return response;
|
||||
}
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setAllocateResponse(AllocateResponse response);
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract AllocateResponse getAllocateResponse();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setNodesForScheduling(List<NodeId> nodesForScheduling);
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract List<NodeId> getNodesForScheduling();
|
||||
}
|
|
@ -0,0 +1,102 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords
|
||||
.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract class DistSchedRegisterResponse {
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public static DistSchedRegisterResponse newInstance
|
||||
(RegisterApplicationMasterResponse regAMResp) {
|
||||
DistSchedRegisterResponse response =
|
||||
Records.newRecord(DistSchedRegisterResponse.class);
|
||||
response.setRegisterResponse(regAMResp);
|
||||
return response;
|
||||
}
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setRegisterResponse(
|
||||
RegisterApplicationMasterResponse resp);
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract RegisterApplicationMasterResponse getRegisterResponse();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setMinAllocatableCapabilty(Resource minResource);
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract Resource getMinAllocatableCapabilty();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setMaxAllocatableCapabilty(Resource maxResource);
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract Resource getMaxAllocatableCapabilty();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setIncrAllocatableCapabilty(Resource maxResource);
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract Resource getIncrAllocatableCapabilty();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setContainerTokenExpiryInterval(int interval);
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract int getContainerTokenExpiryInterval();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setContainerIdStart(long containerIdStart);
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract long getContainerIdStart();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setNodesForScheduling(List<NodeId> nodesForScheduling);
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract List<NodeId> getNodesForScheduling();
|
||||
|
||||
}
|
|
@ -0,0 +1,180 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
|
||||
.AllocateResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords
|
||||
.DistSchedAllocateResponse;
|
||||
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
public class DistSchedAllocateResponsePBImpl extends DistSchedAllocateResponse {
|
||||
|
||||
YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto =
|
||||
YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.getDefaultInstance();
|
||||
YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
private AllocateResponse allocateResponse;
|
||||
private List<NodeId> nodesForScheduling;
|
||||
|
||||
public DistSchedAllocateResponsePBImpl() {
|
||||
builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder();
|
||||
}
|
||||
|
||||
public DistSchedAllocateResponsePBImpl(YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto getProto() {
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
private synchronized void mergeLocalToProto() {
|
||||
if (viaProto)
|
||||
maybeInitBuilder();
|
||||
mergeLocalToBuilder();
|
||||
proto = builder.build();
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
private synchronized void mergeLocalToBuilder() {
|
||||
if (this.nodesForScheduling != null) {
|
||||
builder.clearNodesForScheduling();
|
||||
Iterable<YarnProtos.NodeIdProto> iterable =
|
||||
getNodeIdProtoIterable(this.nodesForScheduling);
|
||||
builder.addAllNodesForScheduling(iterable);
|
||||
}
|
||||
if (this.allocateResponse != null) {
|
||||
builder.setAllocateResponse(
|
||||
((AllocateResponsePBImpl)this.allocateResponse).getProto());
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void setAllocateResponse(AllocateResponse response) {
|
||||
maybeInitBuilder();
|
||||
if(allocateResponse == null) {
|
||||
builder.clearAllocateResponse();
|
||||
}
|
||||
this.allocateResponse = response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AllocateResponse getAllocateResponse() {
|
||||
if (this.allocateResponse != null) {
|
||||
return this.allocateResponse;
|
||||
}
|
||||
|
||||
YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
if (!p.hasAllocateResponse()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
this.allocateResponse =
|
||||
new AllocateResponsePBImpl(p.getAllocateResponse());
|
||||
return this.allocateResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
|
||||
maybeInitBuilder();
|
||||
if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
|
||||
if (this.nodesForScheduling != null) {
|
||||
this.nodesForScheduling.clear();
|
||||
}
|
||||
builder.clearNodesForScheduling();
|
||||
return;
|
||||
}
|
||||
this.nodesForScheduling = new ArrayList<>();
|
||||
this.nodesForScheduling.addAll(nodesForScheduling);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NodeId> getNodesForScheduling() {
|
||||
if (nodesForScheduling != null) {
|
||||
return nodesForScheduling;
|
||||
}
|
||||
initLocalNodesForSchedulingList();
|
||||
return nodesForScheduling;
|
||||
}
|
||||
|
||||
private synchronized void initLocalNodesForSchedulingList() {
|
||||
YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
|
||||
nodesForScheduling = new ArrayList<>();
|
||||
if (list != null) {
|
||||
for (YarnProtos.NodeIdProto t : list) {
|
||||
nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
|
||||
}
|
||||
}
|
||||
}
|
||||
private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
|
||||
final List<NodeId> nodeList) {
|
||||
maybeInitBuilder();
|
||||
return new Iterable<YarnProtos.NodeIdProto>() {
|
||||
@Override
|
||||
public synchronized Iterator<YarnProtos.NodeIdProto> iterator() {
|
||||
return new Iterator<YarnProtos.NodeIdProto>() {
|
||||
|
||||
Iterator<NodeId> iter = nodeList.iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return iter.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public YarnProtos.NodeIdProto next() {
|
||||
return ProtoUtils.convertToProtoFormat(iter.next());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,304 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
|
||||
.RegisterApplicationMasterResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords
|
||||
.DistSchedRegisterResponse;
|
||||
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse {
|
||||
|
||||
YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto =
|
||||
YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.getDefaultInstance();
|
||||
YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
private Resource maxAllocatableCapability;
|
||||
private Resource minAllocatableCapability;
|
||||
private Resource incrAllocatableCapability;
|
||||
private List<NodeId> nodesForScheduling;
|
||||
private RegisterApplicationMasterResponse registerApplicationMasterResponse;
|
||||
|
||||
public DistSchedRegisterResponsePBImpl() {
|
||||
builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder();
|
||||
}
|
||||
|
||||
public DistSchedRegisterResponsePBImpl(YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto getProto() {
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
private synchronized void mergeLocalToProto() {
|
||||
if (viaProto)
|
||||
maybeInitBuilder();
|
||||
mergeLocalToBuilder();
|
||||
proto = builder.build();
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
private synchronized void mergeLocalToBuilder() {
|
||||
if (this.nodesForScheduling != null) {
|
||||
builder.clearNodesForScheduling();
|
||||
Iterable<YarnProtos.NodeIdProto> iterable =
|
||||
getNodeIdProtoIterable(this.nodesForScheduling);
|
||||
builder.addAllNodesForScheduling(iterable);
|
||||
}
|
||||
if (this.maxAllocatableCapability != null) {
|
||||
builder.setMaxAllocCapability(
|
||||
ProtoUtils.convertToProtoFormat(this.maxAllocatableCapability));
|
||||
}
|
||||
if (this.minAllocatableCapability != null) {
|
||||
builder.setMaxAllocCapability(
|
||||
ProtoUtils.convertToProtoFormat(this.minAllocatableCapability));
|
||||
}
|
||||
if (this.registerApplicationMasterResponse != null) {
|
||||
builder.setRegisterResponse(
|
||||
((RegisterApplicationMasterResponsePBImpl)
|
||||
this.registerApplicationMasterResponse).getProto());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRegisterResponse(RegisterApplicationMasterResponse resp) {
|
||||
maybeInitBuilder();
|
||||
if(registerApplicationMasterResponse == null) {
|
||||
builder.clearRegisterResponse();
|
||||
}
|
||||
this.registerApplicationMasterResponse = resp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse getRegisterResponse() {
|
||||
if (this.registerApplicationMasterResponse != null) {
|
||||
return this.registerApplicationMasterResponse;
|
||||
}
|
||||
|
||||
YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasRegisterResponse()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
this.registerApplicationMasterResponse =
|
||||
new RegisterApplicationMasterResponsePBImpl(p.getRegisterResponse());
|
||||
return this.registerApplicationMasterResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxAllocatableCapabilty(Resource maxResource) {
|
||||
maybeInitBuilder();
|
||||
if(maxAllocatableCapability == null) {
|
||||
builder.clearMaxAllocCapability();
|
||||
}
|
||||
this.maxAllocatableCapability = maxResource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getMaxAllocatableCapabilty() {
|
||||
if (this.maxAllocatableCapability != null) {
|
||||
return this.maxAllocatableCapability;
|
||||
}
|
||||
|
||||
YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasMaxAllocCapability()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
this.maxAllocatableCapability =
|
||||
ProtoUtils.convertFromProtoFormat(p.getMaxAllocCapability());
|
||||
return this.maxAllocatableCapability;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMinAllocatableCapabilty(Resource minResource) {
|
||||
maybeInitBuilder();
|
||||
if(minAllocatableCapability == null) {
|
||||
builder.clearMinAllocCapability();
|
||||
}
|
||||
this.minAllocatableCapability = minResource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getMinAllocatableCapabilty() {
|
||||
if (this.minAllocatableCapability != null) {
|
||||
return this.minAllocatableCapability;
|
||||
}
|
||||
|
||||
YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasMinAllocCapability()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
this.minAllocatableCapability =
|
||||
ProtoUtils.convertFromProtoFormat(p.getMinAllocCapability());
|
||||
return this.minAllocatableCapability;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setIncrAllocatableCapabilty(Resource incrResource) {
|
||||
maybeInitBuilder();
|
||||
if(incrAllocatableCapability == null) {
|
||||
builder.clearIncrAllocCapability();
|
||||
}
|
||||
this.incrAllocatableCapability = incrResource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getIncrAllocatableCapabilty() {
|
||||
if (this.incrAllocatableCapability != null) {
|
||||
return this.incrAllocatableCapability;
|
||||
}
|
||||
|
||||
YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasIncrAllocCapability()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
this.incrAllocatableCapability =
|
||||
ProtoUtils.convertFromProtoFormat(p.getIncrAllocCapability());
|
||||
return this.incrAllocatableCapability;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setContainerTokenExpiryInterval(int interval) {
|
||||
maybeInitBuilder();
|
||||
builder.setContainerTokenExpiryInterval(interval);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getContainerTokenExpiryInterval() {
|
||||
YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasContainerTokenExpiryInterval()) {
|
||||
return 0;
|
||||
}
|
||||
return p.getContainerTokenExpiryInterval();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setContainerIdStart(long containerIdStart) {
|
||||
maybeInitBuilder();
|
||||
builder.setContainerIdStart(containerIdStart);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getContainerIdStart() {
|
||||
YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasContainerIdStart()) {
|
||||
return 0;
|
||||
}
|
||||
return p.getContainerIdStart();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
|
||||
maybeInitBuilder();
|
||||
if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
|
||||
if (this.nodesForScheduling != null) {
|
||||
this.nodesForScheduling.clear();
|
||||
}
|
||||
builder.clearNodesForScheduling();
|
||||
return;
|
||||
}
|
||||
this.nodesForScheduling = new ArrayList<>();
|
||||
this.nodesForScheduling.addAll(nodesForScheduling);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NodeId> getNodesForScheduling() {
|
||||
if (nodesForScheduling != null) {
|
||||
return nodesForScheduling;
|
||||
}
|
||||
initLocalNodesForSchedulingList();
|
||||
return nodesForScheduling;
|
||||
}
|
||||
|
||||
private synchronized void initLocalNodesForSchedulingList() {
|
||||
YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
|
||||
nodesForScheduling = new ArrayList<>();
|
||||
if (list != null) {
|
||||
for (YarnProtos.NodeIdProto t : list) {
|
||||
nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
|
||||
}
|
||||
}
|
||||
}
|
||||
private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
|
||||
final List<NodeId> nodeList) {
|
||||
maybeInitBuilder();
|
||||
return new Iterable<YarnProtos.NodeIdProto>() {
|
||||
@Override
|
||||
public synchronized Iterator<YarnProtos.NodeIdProto> iterator() {
|
||||
return new Iterator<YarnProtos.NodeIdProto>() {
|
||||
|
||||
Iterator<NodeId> iter = nodeList.iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return iter.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public YarnProtos.NodeIdProto next() {
|
||||
return ProtoUtils.convertToProtoFormat(iter.next());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* These .proto interfaces are public and stable.
|
||||
* Please see http://wiki.apache.org/hadoop/Compatibility
|
||||
* for what changes are allowed for a *stable* .proto interface.
|
||||
*/
|
||||
|
||||
option java_package = "org.apache.hadoop.yarn.proto";
|
||||
option java_outer_classname = "DistributedSchedulerProtocol";
|
||||
option java_generic_services = true;
|
||||
option java_generate_equals_and_hash = true;
|
||||
package hadoop.yarn;
|
||||
|
||||
import "yarn_service_protos.proto";
|
||||
import "yarn_server_common_service_protos.proto";
|
||||
|
||||
|
||||
service DistributedSchedulerProtocolService {
|
||||
rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto);
|
||||
rpc allocateForDistributedScheduling (AllocateRequestProto) returns (DistSchedAllocateResponseProto);
|
||||
}
|
|
@ -26,6 +26,21 @@ import "yarn_protos.proto";
|
|||
import "yarn_server_common_protos.proto";
|
||||
import "yarn_service_protos.proto";
|
||||
|
||||
message DistSchedRegisterResponseProto {
|
||||
optional RegisterApplicationMasterResponseProto register_response = 1;
|
||||
optional ResourceProto max_alloc_capability = 2;
|
||||
optional ResourceProto min_alloc_capability = 3;
|
||||
optional ResourceProto incr_alloc_capability = 4;
|
||||
optional int32 container_token_expiry_interval = 5;
|
||||
optional int64 container_id_start = 6;
|
||||
repeated NodeIdProto nodes_for_scheduling = 7;
|
||||
}
|
||||
|
||||
message DistSchedAllocateResponseProto {
|
||||
optional AllocateResponseProto allocate_response = 1;
|
||||
repeated NodeIdProto nodes_for_scheduling = 2;
|
||||
}
|
||||
|
||||
message NodeLabelsProto {
|
||||
repeated NodeLabelProto nodeLabels = 1;
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
|
@ -106,4 +107,8 @@ public interface Context {
|
|||
* queued and killed.
|
||||
*/
|
||||
QueuingContext getQueuingContext();
|
||||
|
||||
boolean isDistributedSchedulingEnabled();
|
||||
|
||||
OpportunisticContainerAllocator getContainerAllocator();
|
||||
}
|
||||
|
|
|
@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabel
|
|||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
|
||||
|
@ -195,9 +196,9 @@ public class NodeManager extends CompositeService
|
|||
protected NMContext createNMContext(
|
||||
NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInNM nmTokenSecretManager,
|
||||
NMStateStoreService stateStore) {
|
||||
NMStateStoreService stateStore, boolean isDistSchedulerEnabled) {
|
||||
return new NMContext(containerTokenSecretManager, nmTokenSecretManager,
|
||||
dirsHandler, aclsManager, stateStore);
|
||||
dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled);
|
||||
}
|
||||
|
||||
protected void doSecureLogin() throws IOException {
|
||||
|
@ -318,8 +319,12 @@ public class NodeManager extends CompositeService
|
|||
getNodeHealthScriptRunner(conf), dirsHandler);
|
||||
addService(nodeHealthChecker);
|
||||
|
||||
boolean isDistSchedulingEnabled =
|
||||
conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
|
||||
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
|
||||
|
||||
this.context = createNMContext(containerTokenSecretManager,
|
||||
nmTokenSecretManager, nmStore);
|
||||
nmTokenSecretManager, nmStore, isDistSchedulingEnabled);
|
||||
|
||||
nodeLabelsProvider = createNodeLabelsProvider(conf);
|
||||
|
||||
|
@ -348,6 +353,10 @@ public class NodeManager extends CompositeService
|
|||
addService(webServer);
|
||||
((NMContext) context).setWebServer(webServer);
|
||||
|
||||
((NMContext) context).setQueueableContainerAllocator(
|
||||
new OpportunisticContainerAllocator(nodeStatusUpdater, context,
|
||||
webServer.getPort()));
|
||||
|
||||
dispatcher.register(ContainerManagerEventType.class, containerManager);
|
||||
dispatcher.register(NodeManagerEventType.class, this);
|
||||
addService(dispatcher);
|
||||
|
@ -468,13 +477,16 @@ public class NodeManager extends CompositeService
|
|||
private final ConcurrentLinkedQueue<LogAggregationReport>
|
||||
logAggregationReportForApps;
|
||||
private NodeStatusUpdater nodeStatusUpdater;
|
||||
private final boolean isDistSchedulingEnabled;
|
||||
|
||||
private OpportunisticContainerAllocator containerAllocator;
|
||||
|
||||
private final QueuingContext queuingContext;
|
||||
|
||||
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInNM nmTokenSecretManager,
|
||||
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
|
||||
NMStateStoreService stateStore) {
|
||||
NMStateStoreService stateStore, boolean isDistSchedulingEnabled) {
|
||||
this.containerTokenSecretManager = containerTokenSecretManager;
|
||||
this.nmTokenSecretManager = nmTokenSecretManager;
|
||||
this.dirsHandler = dirsHandler;
|
||||
|
@ -486,6 +498,7 @@ public class NodeManager extends CompositeService
|
|||
this.logAggregationReportForApps = new ConcurrentLinkedQueue<
|
||||
LogAggregationReport>();
|
||||
this.queuingContext = new QueuingNMContext();
|
||||
this.isDistSchedulingEnabled = isDistSchedulingEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -611,6 +624,20 @@ public class NodeManager extends CompositeService
|
|||
public QueuingContext getQueuingContext() {
|
||||
return this.queuingContext;
|
||||
}
|
||||
|
||||
public boolean isDistributedSchedulingEnabled() {
|
||||
return isDistSchedulingEnabled;
|
||||
}
|
||||
|
||||
public void setQueueableContainerAllocator(
|
||||
OpportunisticContainerAllocator containerAllocator) {
|
||||
this.containerAllocator = containerAllocator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpportunisticContainerAllocator getContainerAllocator() {
|
||||
return containerAllocator;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -64,6 +64,8 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler;
|
||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
||||
|
@ -465,6 +467,12 @@ public class AMRMProxyService extends AbstractService implements
|
|||
interceptorClassNames.add(item.trim());
|
||||
}
|
||||
|
||||
// Make sure LocalScheduler is present at the beginning
|
||||
// of the chain..
|
||||
if (this.nmContext.isDistributedSchedulingEnabled()) {
|
||||
interceptorClassNames.add(0, LocalScheduler.class.getName());
|
||||
}
|
||||
|
||||
return interceptorClassNames;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,14 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords
|
||||
.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Implements the RequestInterceptor interface and provides common functionality
|
||||
|
@ -99,4 +107,38 @@ public abstract class AbstractRequestInterceptor implements
|
|||
public AMRMProxyApplicationContext getApplicationContext() {
|
||||
return this.appContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* Default implementation that invokes the distributed scheduling version
|
||||
* of the register method.
|
||||
*
|
||||
* @param request ApplicationMaster allocate request
|
||||
* @return Distribtued Scheduler Allocate Response
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public DistSchedAllocateResponse allocateForDistributedScheduling
|
||||
(AllocateRequest request) throws YarnException, IOException {
|
||||
return (this.nextInterceptor != null) ?
|
||||
this.nextInterceptor.allocateForDistributedScheduling(request) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Default implementation that invokes the distributed scheduling version
|
||||
* of the allocate method.
|
||||
*
|
||||
* @param request ApplicationMaster registration request
|
||||
* @return Distributed Scheduler Register Response
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public DistSchedRegisterResponse
|
||||
registerApplicationMasterForDistributedScheduling
|
||||
(RegisterApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
return (this.nextInterceptor != null) ? this.nextInterceptor
|
||||
.registerApplicationMasterForDistributedScheduling(request) : null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,10 +20,15 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
|
@ -33,9 +38,16 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords
|
||||
.DistSchedAllocateResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -50,7 +62,7 @@ public final class DefaultRequestInterceptor extends
|
|||
AbstractRequestInterceptor {
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(DefaultRequestInterceptor.class);
|
||||
private ApplicationMasterProtocol rmClient;
|
||||
private DistributedSchedulerProtocol rmClient;
|
||||
private UserGroupInformation user = null;
|
||||
|
||||
@Override
|
||||
|
@ -65,11 +77,12 @@ public final class DefaultRequestInterceptor extends
|
|||
final Configuration conf = this.getConf();
|
||||
|
||||
rmClient =
|
||||
user.doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
|
||||
user.doAs(new PrivilegedExceptionAction<DistributedSchedulerProtocol>() {
|
||||
@Override
|
||||
public ApplicationMasterProtocol run() throws Exception {
|
||||
return ClientRMProxy.createRMProxy(conf,
|
||||
ApplicationMasterProtocol.class);
|
||||
public DistributedSchedulerProtocol run() throws Exception {
|
||||
setAMRMTokenService(conf);
|
||||
return ServerRMProxy.createRMProxy(conf,
|
||||
DistributedSchedulerProtocol.class);
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
|
@ -109,6 +122,32 @@ public final class DefaultRequestInterceptor extends
|
|||
return allocateResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DistSchedRegisterResponse
|
||||
registerApplicationMasterForDistributedScheduling
|
||||
(RegisterApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" +
|
||||
"request to the real YARN RM");
|
||||
return rmClient.registerApplicationMasterForDistributedScheduling(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DistSchedAllocateResponse allocateForDistributedScheduling
|
||||
(AllocateRequest request) throws YarnException, IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Forwarding allocateForDistributedScheduling request" +
|
||||
"to the real YARN RM");
|
||||
}
|
||||
DistSchedAllocateResponse allocateResponse =
|
||||
rmClient.allocateForDistributedScheduling(request);
|
||||
if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
|
||||
updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
|
||||
}
|
||||
|
||||
return allocateResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FinishApplicationMasterResponse finishApplicationMaster(
|
||||
final FinishApplicationMasterRequest request) throws YarnException,
|
||||
|
@ -139,7 +178,85 @@ public final class DefaultRequestInterceptor extends
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setRMClient(ApplicationMasterProtocol rmClient) {
|
||||
this.rmClient = rmClient;
|
||||
public void setRMClient(final ApplicationMasterProtocol rmClient) {
|
||||
if (rmClient instanceof DistributedSchedulerProtocol) {
|
||||
this.rmClient = (DistributedSchedulerProtocol)rmClient;
|
||||
} else {
|
||||
this.rmClient = new DistributedSchedulerProtocol() {
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster
|
||||
(RegisterApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
return rmClient.registerApplicationMaster(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FinishApplicationMasterResponse finishApplicationMaster
|
||||
(FinishApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
return rmClient.finishApplicationMaster(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest request) throws
|
||||
YarnException, IOException {
|
||||
return rmClient.allocate(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DistSchedRegisterResponse
|
||||
registerApplicationMasterForDistributedScheduling
|
||||
(RegisterApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
throw new IOException("Not Supported !!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public DistSchedAllocateResponse
|
||||
allocateForDistributedScheduling(AllocateRequest request) throws
|
||||
YarnException, IOException {
|
||||
throw new IOException("Not Supported !!");
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private static void setAMRMTokenService(final Configuration conf)
|
||||
throws IOException {
|
||||
for (org.apache.hadoop.security.token.Token<? extends TokenIdentifier> token : UserGroupInformation
|
||||
.getCurrentUser().getTokens()) {
|
||||
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
|
||||
token.setService(getAMRMTokenService(conf));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@InterfaceStability.Unstable
|
||||
public static Text getAMRMTokenService(Configuration conf) {
|
||||
return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||
}
|
||||
|
||||
@InterfaceStability.Unstable
|
||||
public static Text getTokenService(Configuration conf, String address,
|
||||
String defaultAddr, int defaultPort) {
|
||||
if (HAUtil.isHAEnabled(conf)) {
|
||||
// Build a list of service addresses to form the service name
|
||||
ArrayList<String> services = new ArrayList<String>();
|
||||
YarnConfiguration yarnConf = new YarnConfiguration(conf);
|
||||
for (String rmId : HAUtil.getRMHAIds(conf)) {
|
||||
// Set RM_ID to get the corresponding RM_ADDRESS
|
||||
yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
|
||||
services.add(SecurityUtil.buildTokenService(
|
||||
yarnConf.getSocketAddr(address, defaultAddr, defaultPort))
|
||||
.toString());
|
||||
}
|
||||
return new Text(Joiner.on(',').join(services));
|
||||
}
|
||||
|
||||
// Non-HA case - no need to set RM_ID
|
||||
return SecurityUtil.buildTokenService(conf.getSocketAddr(address,
|
||||
defaultAddr, defaultPort));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,14 +19,14 @@
|
|||
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
|
||||
|
||||
/**
|
||||
* Defines the contract to be implemented by the request intercepter classes,
|
||||
* that can be used to intercept and inspect messages sent from the application
|
||||
* master to the resource manager.
|
||||
*/
|
||||
public interface RequestInterceptor extends ApplicationMasterProtocol,
|
||||
public interface RequestInterceptor extends DistributedSchedulerProtocol,
|
||||
Configurable {
|
||||
/**
|
||||
* This method is called for initializing the intercepter. This is guaranteed
|
||||
|
|
|
@ -0,0 +1,416 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.scheduler;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords
|
||||
.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords
|
||||
.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords
|
||||
.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords
|
||||
.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy
|
||||
.AMRMProxyApplicationContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
|
||||
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security
|
||||
.NMTokenSecretManagerInNM;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
/**
|
||||
* <p>The LocalScheduler runs on the NodeManager and is modelled as an
|
||||
* <code>AMRMProxy</code> request interceptor. It is responsible for the
|
||||
* following :</p>
|
||||
* <ul>
|
||||
* <li>Intercept <code>ApplicationMasterProtocol</code> calls and unwrap the
|
||||
* response objects to extract instructions from the
|
||||
* <code>ClusterManager</code> running on the ResourceManager to aid in making
|
||||
* Scheduling scheduling decisions</li>
|
||||
* <li>Call the <code>OpportunisticContainerAllocator</code> to allocate
|
||||
* containers for the opportunistic resource outstandingOpReqs</li>
|
||||
* </ul>
|
||||
*/
|
||||
public final class LocalScheduler extends AbstractRequestInterceptor {
|
||||
|
||||
static class PartitionedResourceRequests {
|
||||
private List<ResourceRequest> guaranteed = new ArrayList<>();
|
||||
private List<ResourceRequest> opportunistic = new ArrayList<>();
|
||||
public List<ResourceRequest> getGuaranteed() {
|
||||
return guaranteed;
|
||||
}
|
||||
public List<ResourceRequest> getOpportunistic() {
|
||||
return opportunistic;
|
||||
}
|
||||
}
|
||||
|
||||
static class DistSchedulerParams {
|
||||
Resource maxResource;
|
||||
Resource minResource;
|
||||
Resource incrementResource;
|
||||
int containerTokenExpiryInterval;
|
||||
}
|
||||
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(LocalScheduler.class);
|
||||
|
||||
// Currently just used to keep track of allocated Containers
|
||||
// Can be used for reporting stats later
|
||||
private Set<ContainerId> containersAllocated = new HashSet<>();
|
||||
|
||||
private DistSchedulerParams appParams = new DistSchedulerParams();
|
||||
private final OpportunisticContainerAllocator.ContainerIdCounter containerIdCounter =
|
||||
new OpportunisticContainerAllocator.ContainerIdCounter();
|
||||
private Map<String, NodeId> nodeList = new HashMap<>();
|
||||
|
||||
// Mapping of NodeId to NodeTokens. Populated either from RM response or
|
||||
// generated locally if required.
|
||||
private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
|
||||
final Set<String> blacklist = new HashSet<>();
|
||||
|
||||
// This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
|
||||
// Resource Name (Host/rack/any) and capability. This mapping is required
|
||||
// to match a received Container to an outstanding OPPORTUNISTIC
|
||||
// ResourceRequests (ask)
|
||||
final TreeMap<Priority, Map<Resource, ResourceRequest>>
|
||||
outstandingOpReqs = new TreeMap<>();
|
||||
|
||||
private ApplicationAttemptId applicationAttemptId;
|
||||
private OpportunisticContainerAllocator containerAllocator;
|
||||
private NMTokenSecretManagerInNM nmSecretManager;
|
||||
private String appSubmitter;
|
||||
|
||||
public void init(AMRMProxyApplicationContext appContext) {
|
||||
super.init(appContext);
|
||||
initLocal(appContext.getApplicationAttemptId(),
|
||||
appContext.getNMCotext().getContainerAllocator(),
|
||||
appContext.getNMCotext().getNMTokenSecretManager(),
|
||||
appContext.getUser());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void initLocal(ApplicationAttemptId applicationAttemptId,
|
||||
OpportunisticContainerAllocator containerAllocator,
|
||||
NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) {
|
||||
this.applicationAttemptId = applicationAttemptId;
|
||||
this.containerAllocator = containerAllocator;
|
||||
this.nmSecretManager = nmSecretManager;
|
||||
this.appSubmitter = appSubmitter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Route register call to the corresponding distributed scheduling method viz.
|
||||
* registerApplicationMasterForDistributedScheduling, and return response to
|
||||
* the caller after stripping away Distributed Scheduling information.
|
||||
*
|
||||
* @param request
|
||||
* registration request
|
||||
* @return Allocate Response
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster
|
||||
(RegisterApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
return registerApplicationMasterForDistributedScheduling(request)
|
||||
.getRegisterResponse();
|
||||
}
|
||||
|
||||
/**
|
||||
* Route allocate call to the allocateForDistributedScheduling method and
|
||||
* return response to the caller after stripping away Distributed Scheduling
|
||||
* information.
|
||||
*
|
||||
* @param request
|
||||
* allocation request
|
||||
* @return Allocate Response
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest request) throws
|
||||
YarnException, IOException {
|
||||
return allocateForDistributedScheduling(request).getAllocateResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FinishApplicationMasterResponse finishApplicationMaster
|
||||
(FinishApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
return getNextInterceptor().finishApplicationMaster(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if we already have a NMToken. if Not, generate the Token and
|
||||
* add it to the response
|
||||
* @param response
|
||||
* @param nmTokens
|
||||
* @param allocatedContainers
|
||||
*/
|
||||
private void updateResponseWithNMTokens(AllocateResponse response,
|
||||
List<NMToken> nmTokens, List<Container> allocatedContainers) {
|
||||
List<NMToken> newTokens = new ArrayList<>();
|
||||
if (allocatedContainers.size() > 0) {
|
||||
response.getAllocatedContainers().addAll(allocatedContainers);
|
||||
for (Container alloc : allocatedContainers) {
|
||||
if (!nodeTokens.containsKey(alloc.getNodeId())) {
|
||||
newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc));
|
||||
}
|
||||
}
|
||||
List<NMToken> retTokens = new ArrayList<>(nmTokens);
|
||||
retTokens.addAll(newTokens);
|
||||
response.setNMTokens(retTokens);
|
||||
}
|
||||
}
|
||||
|
||||
private PartitionedResourceRequests partitionAskList(List<ResourceRequest>
|
||||
askList) {
|
||||
PartitionedResourceRequests partitionedRequests =
|
||||
new PartitionedResourceRequests();
|
||||
for (ResourceRequest rr : askList) {
|
||||
if (rr.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
|
||||
partitionedRequests.getOpportunistic().add(rr);
|
||||
} else {
|
||||
partitionedRequests.getGuaranteed().add(rr);
|
||||
}
|
||||
}
|
||||
return partitionedRequests;
|
||||
}
|
||||
|
||||
private void updateParameters(
|
||||
DistSchedRegisterResponse registerResponse) {
|
||||
appParams.minResource = registerResponse.getMinAllocatableCapabilty();
|
||||
appParams.maxResource = registerResponse.getMaxAllocatableCapabilty();
|
||||
appParams.incrementResource =
|
||||
registerResponse.getIncrAllocatableCapabilty();
|
||||
if (appParams.incrementResource == null) {
|
||||
appParams.incrementResource = appParams.minResource;
|
||||
}
|
||||
appParams.containerTokenExpiryInterval = registerResponse
|
||||
.getContainerTokenExpiryInterval();
|
||||
|
||||
containerIdCounter
|
||||
.resetContainerIdCounter(registerResponse.getContainerIdStart());
|
||||
setNodeList(registerResponse.getNodesForScheduling());
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes a list of ResourceRequests (asks), extracts the key information viz.
|
||||
* (Priority, ResourceName, Capability) and adds it the outstanding
|
||||
* OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
|
||||
* the current YARN constraint that only a single ResourceRequest can exist at
|
||||
* a give Priority and Capability
|
||||
* @param resourceAsks
|
||||
*/
|
||||
public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
|
||||
for (ResourceRequest request : resourceAsks) {
|
||||
Priority priority = request.getPriority();
|
||||
|
||||
// TODO: Extend for Node/Rack locality. We only handle ANY requests now
|
||||
if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (request.getNumContainers() == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Map<Resource, ResourceRequest> reqMap =
|
||||
this.outstandingOpReqs.get(priority);
|
||||
if (reqMap == null) {
|
||||
reqMap = new HashMap<>();
|
||||
this.outstandingOpReqs.put(priority, reqMap);
|
||||
}
|
||||
|
||||
ResourceRequest resourceRequest = reqMap.get(request.getCapability());
|
||||
if (resourceRequest == null) {
|
||||
resourceRequest = request;
|
||||
reqMap.put(request.getCapability(), request);
|
||||
} else {
|
||||
resourceRequest.setNumContainers(
|
||||
resourceRequest.getNumContainers() + request.getNumContainers());
|
||||
}
|
||||
if (ResourceRequest.isAnyLocation(request.getResourceName())) {
|
||||
LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority
|
||||
+ ", with capability = " + request.getCapability() + " ) : "
|
||||
+ resourceRequest.getNumContainers());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method matches a returned list of Container Allocations to any
|
||||
* outstanding OPPORTUNISTIC ResourceRequest
|
||||
* @param capability
|
||||
* @param allocatedContainers
|
||||
*/
|
||||
public void matchAllocationToOutstandingRequest(Resource capability,
|
||||
List<Container> allocatedContainers) {
|
||||
for (Container c : allocatedContainers) {
|
||||
containersAllocated.add(c.getId());
|
||||
Map<Resource, ResourceRequest> asks =
|
||||
outstandingOpReqs.get(c.getPriority());
|
||||
|
||||
if (asks == null)
|
||||
continue;
|
||||
|
||||
ResourceRequest rr = asks.get(capability);
|
||||
if (rr != null) {
|
||||
rr.setNumContainers(rr.getNumContainers() - 1);
|
||||
if (rr.getNumContainers() == 0) {
|
||||
asks.remove(capability);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setNodeList(List<NodeId> nodeList) {
|
||||
this.nodeList.clear();
|
||||
addToNodeList(nodeList);
|
||||
}
|
||||
|
||||
private void addToNodeList(List<NodeId> nodes) {
|
||||
for (NodeId n : nodes) {
|
||||
this.nodeList.put(n.getHost(), n);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DistSchedRegisterResponse
|
||||
registerApplicationMasterForDistributedScheduling
|
||||
(RegisterApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
LOG.info("Forwarding registration request to the" +
|
||||
"Distributed Scheduler Service on YARN RM");
|
||||
DistSchedRegisterResponse dsResp = getNextInterceptor()
|
||||
.registerApplicationMasterForDistributedScheduling(request);
|
||||
updateParameters(dsResp);
|
||||
return dsResp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DistSchedAllocateResponse allocateForDistributedScheduling
|
||||
(AllocateRequest request) throws YarnException, IOException {
|
||||
LOG.info("Forwarding allocate request to the" +
|
||||
"Distributed Scheduler Service on YARN RM");
|
||||
// Partition requests into GUARANTEED and OPPORTUNISTIC reqs
|
||||
PartitionedResourceRequests partitionedAsks = partitionAskList(request
|
||||
.getAskList());
|
||||
|
||||
List<ContainerId> releasedContainers = request.getReleaseList();
|
||||
int numReleasedContainers = releasedContainers.size();
|
||||
if (numReleasedContainers > 0) {
|
||||
LOG.info("AttemptID: " + applicationAttemptId + " released: "
|
||||
+ numReleasedContainers);
|
||||
containersAllocated.removeAll(releasedContainers);
|
||||
}
|
||||
|
||||
// Also, update black list
|
||||
ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
|
||||
if (rbr != null) {
|
||||
blacklist.removeAll(rbr.getBlacklistRemovals());
|
||||
blacklist.addAll(rbr.getBlacklistAdditions());
|
||||
}
|
||||
|
||||
// Add OPPORTUNISTIC reqs to the outstanding reqs
|
||||
addToOutstandingReqs(partitionedAsks.getOpportunistic());
|
||||
|
||||
List<Container> allocatedContainers = new ArrayList<>();
|
||||
for (Priority priority : outstandingOpReqs.descendingKeySet()) {
|
||||
// Allocated containers :
|
||||
// Key = Requested Capability,
|
||||
// Value = List of Containers of given Cap (The actual container size
|
||||
// might be different than what is requested.. which is why
|
||||
// we need the requested capability (key) to match against
|
||||
// the outstanding reqs)
|
||||
Map<Resource, List<Container>> allocated =
|
||||
containerAllocator.allocate(this.appParams, containerIdCounter,
|
||||
outstandingOpReqs.get(priority).values(), blacklist,
|
||||
applicationAttemptId, nodeList, appSubmitter);
|
||||
for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
|
||||
matchAllocationToOutstandingRequest(e.getKey(), e.getValue());
|
||||
allocatedContainers.addAll(e.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
// Send all the GUARANTEED Reqs to RM
|
||||
request.setAskList(partitionedAsks.getGuaranteed());
|
||||
DistSchedAllocateResponse dsResp =
|
||||
getNextInterceptor().allocateForDistributedScheduling(request);
|
||||
|
||||
// Update host to nodeId mapping
|
||||
setNodeList(dsResp.getNodesForScheduling());
|
||||
List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens();
|
||||
for (NMToken nmToken : nmTokens) {
|
||||
nodeTokens.put(nmToken.getNodeId(), nmToken);
|
||||
}
|
||||
|
||||
List<ContainerStatus> completedContainers =
|
||||
dsResp.getAllocateResponse().getCompletedContainersStatuses();
|
||||
|
||||
// Only account for opportunistic containers
|
||||
for (ContainerStatus cs : completedContainers) {
|
||||
if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
|
||||
containersAllocated.remove(cs.getContainerId());
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we have NM tokens for all the allocated containers. If not
|
||||
// generate one and update the response.
|
||||
updateResponseWithNMTokens(
|
||||
dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(
|
||||
"Number of opportunistic containers currently allocated by" +
|
||||
"application: " + containersAllocated.size());
|
||||
}
|
||||
return dsResp;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,185 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.scheduler;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.yarn.api.records.*;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler.DistSchedulerParams;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* <p>The OpportunisticContainerAllocator allocates containers on a given list
|
||||
* of Nodes after it modifies the container sizes to within allowable limits
|
||||
* specified by the <code>ClusterManager</code> running on the RM. It tries to
|
||||
* distribute the containers as evenly as possible. It also uses the
|
||||
* <code>NMTokenSecretManagerInNM</code> to generate the required NM tokens for
|
||||
* the allocated containers</p>
|
||||
*/
|
||||
public class OpportunisticContainerAllocator {
|
||||
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(OpportunisticContainerAllocator.class);
|
||||
|
||||
private static final ResourceCalculator RESOURCE_CALCULATOR =
|
||||
new DominantResourceCalculator();
|
||||
|
||||
static class ContainerIdCounter {
|
||||
final AtomicLong containerIdCounter = new AtomicLong(1);
|
||||
|
||||
void resetContainerIdCounter(long containerIdStart) {
|
||||
this.containerIdCounter.set(containerIdStart);
|
||||
}
|
||||
|
||||
long generateContainerId() {
|
||||
return this.containerIdCounter.decrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
private final NodeStatusUpdater nodeStatusUpdater;
|
||||
private final Context context;
|
||||
private int webpagePort;
|
||||
|
||||
public OpportunisticContainerAllocator(NodeStatusUpdater nodeStatusUpdater,
|
||||
Context context, int webpagePort) {
|
||||
this.nodeStatusUpdater = nodeStatusUpdater;
|
||||
this.context = context;
|
||||
this.webpagePort = webpagePort;
|
||||
}
|
||||
|
||||
public Map<Resource, List<Container>> allocate(DistSchedulerParams appParams,
|
||||
ContainerIdCounter idCounter, Collection<ResourceRequest> resourceAsks,
|
||||
Set<String> blacklist, ApplicationAttemptId appAttId,
|
||||
Map<String, NodeId> allNodes, String userName) throws YarnException {
|
||||
Map<Resource, List<Container>> containers = new HashMap<>();
|
||||
Set<String> nodesAllocated = new HashSet<>();
|
||||
int numAsks = resourceAsks.size();
|
||||
for (ResourceRequest anyAsk : resourceAsks) {
|
||||
allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId,
|
||||
allNodes, userName, containers, nodesAllocated, anyAsk);
|
||||
}
|
||||
if (numAsks > 0) {
|
||||
LOG.info("Opportunistic allocation requested for: " + numAsks
|
||||
+ " containers; allocated = " + containers.size());
|
||||
}
|
||||
return containers;
|
||||
}
|
||||
|
||||
private void allocateOpportunisticContainers(DistSchedulerParams appParams,
|
||||
ContainerIdCounter idCounter, Set<String> blacklist,
|
||||
ApplicationAttemptId id, Map<String, NodeId> allNodes, String userName,
|
||||
Map<Resource, List<Container>> containers, Set<String> nodesAllocated,
|
||||
ResourceRequest anyAsk) throws YarnException {
|
||||
int toAllocate = anyAsk.getNumContainers()
|
||||
- (containers.isEmpty() ?
|
||||
0 : containers.get(anyAsk.getCapability()).size());
|
||||
|
||||
List<String> topKNodesLeft = new ArrayList<>();
|
||||
for (String s : allNodes.keySet()) {
|
||||
// Bias away from whatever we have already allocated and respect blacklist
|
||||
if (nodesAllocated.contains(s) || blacklist.contains(s)) {
|
||||
continue;
|
||||
}
|
||||
topKNodesLeft.add(s);
|
||||
}
|
||||
int numAllocated = 0;
|
||||
int nextNodeToAllocate = 0;
|
||||
for (int numCont = 0; numCont < toAllocate; numCont++) {
|
||||
String topNode = topKNodesLeft.get(nextNodeToAllocate);
|
||||
nextNodeToAllocate++;
|
||||
nextNodeToAllocate %= topKNodesLeft.size();
|
||||
NodeId nodeId = allNodes.get(topNode);
|
||||
Container container = buildContainer(appParams, idCounter, anyAsk, id,
|
||||
userName, nodeId);
|
||||
List<Container> cList = containers.get(anyAsk.getCapability());
|
||||
if (cList == null) {
|
||||
cList = new ArrayList<>();
|
||||
containers.put(anyAsk.getCapability(), cList);
|
||||
}
|
||||
cList.add(container);
|
||||
numAllocated++;
|
||||
LOG.info("Allocated " + numAllocated + " opportunistic containers.");
|
||||
}
|
||||
}
|
||||
|
||||
private Container buildContainer(DistSchedulerParams appParams,
|
||||
ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id,
|
||||
String userName, NodeId nodeId) throws YarnException {
|
||||
ContainerId cId =
|
||||
ContainerId.newContainerId(id, idCounter.generateContainerId());
|
||||
|
||||
// Normalize the resource asks (Similar to what the the RM scheduler does
|
||||
// before accepting an ask)
|
||||
Resource capability = normalizeCapability(appParams, rr);
|
||||
|
||||
long currTime = System.currentTimeMillis();
|
||||
ContainerTokenIdentifier containerTokenIdentifier =
|
||||
new ContainerTokenIdentifier(
|
||||
cId, nodeId.getHost(), userName, capability,
|
||||
currTime + appParams.containerTokenExpiryInterval,
|
||||
context.getContainerTokenSecretManager().getCurrentKey().getKeyId(),
|
||||
nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime,
|
||||
null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
|
||||
ExecutionType.OPPORTUNISTIC);
|
||||
byte[] pwd =
|
||||
context.getContainerTokenSecretManager().createPassword(
|
||||
containerTokenIdentifier);
|
||||
Token containerToken = newContainerToken(nodeId, pwd,
|
||||
containerTokenIdentifier);
|
||||
Container container = BuilderUtils.newContainer(
|
||||
cId, nodeId, nodeId.getHost() + ":" + webpagePort,
|
||||
capability, rr.getPriority(), containerToken);
|
||||
return container;
|
||||
}
|
||||
|
||||
private Resource normalizeCapability(DistSchedulerParams appParams,
|
||||
ResourceRequest ask) {
|
||||
return Resources.normalize(RESOURCE_CALCULATOR,
|
||||
ask.getCapability(), appParams.minResource, appParams.maxResource,
|
||||
appParams.incrementResource);
|
||||
}
|
||||
|
||||
public static Token newContainerToken(NodeId nodeId, byte[] password,
|
||||
ContainerTokenIdentifier tokenIdentifier) {
|
||||
// RPC layer client expects ip:port as service for tokens
|
||||
InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(),
|
||||
nodeId.getPort());
|
||||
// NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
|
||||
Token containerToken = Token.newInstance(tokenIdentifier.getBytes(),
|
||||
ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
|
||||
.buildTokenService(addr).toString());
|
||||
return containerToken;
|
||||
}
|
||||
|
||||
}
|
|
@ -29,7 +29,10 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
|
@ -50,7 +53,7 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
|
|||
private final Map<ApplicationAttemptId, MasterKeyData> oldMasterKeys;
|
||||
private final Map<ApplicationId, List<ApplicationAttemptId>> appToAppAttemptMap;
|
||||
private final NMStateStoreService stateStore;
|
||||
private NodeId nodeId;
|
||||
private NodeId nodeId;
|
||||
|
||||
public NMTokenSecretManagerInNM() {
|
||||
this(new NMNullStateStoreService());
|
||||
|
@ -276,4 +279,23 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
|
|||
LOG.error("Unable to remove master key for application " + attempt, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used by the Distributed Scheduler framework to generate NMTokens
|
||||
* @param applicationSubmitter
|
||||
* @param container
|
||||
* @return NMToken
|
||||
*/
|
||||
public NMToken generateNMToken(
|
||||
String applicationSubmitter, Container container) {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
Token token =
|
||||
createNMToken(container.getId().getApplicationAttemptId(),
|
||||
container.getNodeId(), applicationSubmitter);
|
||||
return NMToken.newInstance(container.getNodeId(), token);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ public class TestEventFlow {
|
|||
|
||||
Context context = new NMContext(new NMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInNM(), null, null,
|
||||
new NMNullStateStoreService()) {
|
||||
new NMNullStateStoreService(), false) {
|
||||
@Override
|
||||
public int getHttpPort() {
|
||||
return 1234;
|
||||
|
|
|
@ -1583,7 +1583,7 @@ public class TestNodeStatusUpdater {
|
|||
protected NMContext createNMContext(
|
||||
NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInNM nmTokenSecretManager,
|
||||
NMStateStoreService store) {
|
||||
NMStateStoreService store, boolean isDistributedSchedulingEnabled) {
|
||||
return new MyNMContext(containerTokenSecretManager,
|
||||
nmTokenSecretManager);
|
||||
}
|
||||
|
@ -1818,7 +1818,7 @@ public class TestNodeStatusUpdater {
|
|||
NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInNM nmTokenSecretManager) {
|
||||
super(containerTokenSecretManager, nmTokenSecretManager, null, null,
|
||||
new NMNullStateStoreService());
|
||||
new NMNullStateStoreService(), false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
|
@ -688,5 +689,14 @@ public abstract class BaseAMRMProxyTest {
|
|||
public QueuingContext getQueuingContext() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public boolean isDistributedSchedulingEnabled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpportunisticContainerAllocator getContainerAllocator() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -110,7 +110,7 @@ public abstract class BaseContainerManagerTest {
|
|||
protected Configuration conf = new YarnConfiguration();
|
||||
protected Context context = new NMContext(new NMContainerTokenSecretManager(
|
||||
conf), new NMTokenSecretManagerInNM(), null,
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService()) {
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) {
|
||||
public int getHttpPort() {
|
||||
return HTTP_PORT;
|
||||
};
|
||||
|
|
|
@ -559,7 +559,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
|
|||
NMStateStoreService stateStore) {
|
||||
NMContext context = new NMContext(new NMContainerTokenSecretManager(
|
||||
conf), new NMTokenSecretManagerInNM(), null,
|
||||
new ApplicationACLsManager(conf), stateStore){
|
||||
new ApplicationACLsManager(conf), stateStore, false){
|
||||
public int getHttpPort() {
|
||||
return HTTP_PORT;
|
||||
}
|
||||
|
|
|
@ -113,7 +113,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
private static final String INVALID_JAVA_HOME = "/no/jvm/here";
|
||||
protected Context distContext = new NMContext(new NMContainerTokenSecretManager(
|
||||
conf), new NMTokenSecretManagerInNM(), null,
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService()) {
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) {
|
||||
public int getHttpPort() {
|
||||
return HTTP_PORT;
|
||||
};
|
||||
|
|
|
@ -81,7 +81,8 @@ public class TestLocalCacheDirectoryManager {
|
|||
NMContext nmContext =
|
||||
new NMContext(new NMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInNM(), null,
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService());
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService(),
|
||||
false);
|
||||
ResourceLocalizationService service =
|
||||
new ResourceLocalizationService(null, null, null, null, nmContext);
|
||||
try {
|
||||
|
|
|
@ -186,7 +186,7 @@ public class TestResourceLocalizationService {
|
|||
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
|
||||
nmContext = new NMContext(new NMContainerTokenSecretManager(
|
||||
conf), new NMTokenSecretManagerInNM(), null,
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService());
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -2365,7 +2365,7 @@ public class TestResourceLocalizationService {
|
|||
NMContext nmContext =
|
||||
new NMContext(new NMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInNM(), null,
|
||||
new ApplicationACLsManager(conf), stateStore);
|
||||
new ApplicationACLsManager(conf), stateStore, false);
|
||||
ResourceLocalizationService rawService =
|
||||
new ResourceLocalizationService(dispatcher, exec, delService,
|
||||
dirsHandler, nmContext);
|
||||
|
|
|
@ -0,0 +1,212 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.scheduler;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords
|
||||
.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords
|
||||
.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security
|
||||
.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security
|
||||
.NMTokenSecretManagerInNM;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class TestLocalScheduler {
|
||||
|
||||
@Test
|
||||
public void testLocalScheduler() throws Exception {
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
LocalScheduler localScheduler = new LocalScheduler();
|
||||
|
||||
NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class);
|
||||
Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l);
|
||||
Context context = Mockito.mock(Context.class);
|
||||
NMContainerTokenSecretManager nmContainerTokenSecretManager = new
|
||||
NMContainerTokenSecretManager(conf);
|
||||
MasterKey mKey = new MasterKey() {
|
||||
@Override
|
||||
public int getKeyId() {
|
||||
return 1;
|
||||
}
|
||||
@Override
|
||||
public void setKeyId(int keyId) {}
|
||||
@Override
|
||||
public ByteBuffer getBytes() {
|
||||
return ByteBuffer.allocate(8);
|
||||
}
|
||||
@Override
|
||||
public void setBytes(ByteBuffer bytes) {}
|
||||
};
|
||||
nmContainerTokenSecretManager.setMasterKey(mKey);
|
||||
Mockito.when(context.getContainerTokenSecretManager()).thenReturn
|
||||
(nmContainerTokenSecretManager);
|
||||
OpportunisticContainerAllocator containerAllocator =
|
||||
new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777);
|
||||
|
||||
NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
|
||||
new NMTokenSecretManagerInNM();
|
||||
nmTokenSecretManagerInNM.setMasterKey(mKey);
|
||||
localScheduler.initLocal(
|
||||
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
|
||||
containerAllocator, nmTokenSecretManagerInNM, "test");
|
||||
|
||||
RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class);
|
||||
localScheduler.setNextInterceptor(finalReqIntcptr);
|
||||
|
||||
DistSchedRegisterResponse distSchedRegisterResponse =
|
||||
Records.newRecord(DistSchedRegisterResponse.class);
|
||||
distSchedRegisterResponse.setRegisterResponse(
|
||||
Records.newRecord(RegisterApplicationMasterResponse.class));
|
||||
distSchedRegisterResponse.setContainerTokenExpiryInterval(12345);
|
||||
distSchedRegisterResponse.setContainerIdStart(0);
|
||||
distSchedRegisterResponse.setMaxAllocatableCapabilty(
|
||||
Resource.newInstance(1024, 4));
|
||||
distSchedRegisterResponse.setMinAllocatableCapabilty(
|
||||
Resource.newInstance(512, 2));
|
||||
distSchedRegisterResponse.setNodesForScheduling(Arrays.asList(
|
||||
NodeId.newInstance("a", 1), NodeId.newInstance("b", 2)));
|
||||
Mockito.when(
|
||||
finalReqIntcptr.registerApplicationMasterForDistributedScheduling(
|
||||
Mockito.any(RegisterApplicationMasterRequest.class)))
|
||||
.thenReturn(distSchedRegisterResponse);
|
||||
|
||||
localScheduler.registerApplicationMaster(
|
||||
Records.newRecord(RegisterApplicationMasterRequest.class));
|
||||
|
||||
Mockito.when(
|
||||
finalReqIntcptr.allocateForDistributedScheduling(
|
||||
Mockito.any(AllocateRequest.class)))
|
||||
.thenAnswer(new Answer<DistSchedAllocateResponse>() {
|
||||
@Override
|
||||
public DistSchedAllocateResponse answer(InvocationOnMock
|
||||
invocationOnMock) throws Throwable {
|
||||
return createAllocateResponse(Arrays.asList(
|
||||
NodeId.newInstance("c", 3), NodeId.newInstance("d", 4)));
|
||||
}
|
||||
});
|
||||
|
||||
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
|
||||
ResourceRequest guaranteedReq = Records.newRecord(ResourceRequest.class);
|
||||
guaranteedReq.setExecutionType(ExecutionType.GUARANTEED);
|
||||
guaranteedReq.setNumContainers(5);
|
||||
guaranteedReq.setCapability(Resource.newInstance(2048, 2));
|
||||
guaranteedReq.setRelaxLocality(true);
|
||||
guaranteedReq.setResourceName("*");
|
||||
ResourceRequest opportunisticReq = Records.newRecord(ResourceRequest.class);
|
||||
opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC);
|
||||
opportunisticReq.setNumContainers(4);
|
||||
opportunisticReq.setCapability(Resource.newInstance(1024, 4));
|
||||
opportunisticReq.setPriority(Priority.newInstance(100));
|
||||
opportunisticReq.setRelaxLocality(true);
|
||||
opportunisticReq.setResourceName("*");
|
||||
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
|
||||
|
||||
// Verify 4 containers were allocated
|
||||
AllocateResponse allocateResponse = localScheduler.allocate(allocateRequest);
|
||||
Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size());
|
||||
|
||||
// Verify equal distribution on hosts a and b
|
||||
// And None on c and d
|
||||
Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse);
|
||||
Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size());
|
||||
Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size());
|
||||
Assert.assertNull(allocs.get(NodeId.newInstance("c", 3)));
|
||||
Assert.assertNull(allocs.get(NodeId.newInstance("d", 4)));
|
||||
|
||||
// New Allocate request
|
||||
allocateRequest = Records.newRecord(AllocateRequest.class);
|
||||
opportunisticReq = Records.newRecord(ResourceRequest.class);
|
||||
opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC);
|
||||
opportunisticReq.setNumContainers(6);
|
||||
opportunisticReq.setCapability(Resource.newInstance(512, 3));
|
||||
opportunisticReq.setPriority(Priority.newInstance(100));
|
||||
opportunisticReq.setRelaxLocality(true);
|
||||
opportunisticReq.setResourceName("*");
|
||||
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
|
||||
|
||||
// Verify 6 containers were allocated
|
||||
allocateResponse = localScheduler.allocate(allocateRequest);
|
||||
Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size());
|
||||
|
||||
// Verify New containers are equally distribution on hosts c and d
|
||||
// And None on a and b
|
||||
allocs = mapAllocs(allocateResponse);
|
||||
Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size());
|
||||
Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size());
|
||||
Assert.assertNull(allocs.get(NodeId.newInstance("a", 1)));
|
||||
Assert.assertNull(allocs.get(NodeId.newInstance("b", 2)));
|
||||
}
|
||||
|
||||
private DistSchedAllocateResponse createAllocateResponse(List<NodeId> nodes) {
|
||||
DistSchedAllocateResponse distSchedAllocateResponse = Records.newRecord
|
||||
(DistSchedAllocateResponse.class);
|
||||
distSchedAllocateResponse.setAllocateResponse(
|
||||
Records.newRecord(AllocateResponse.class));
|
||||
distSchedAllocateResponse.setNodesForScheduling(nodes);
|
||||
return distSchedAllocateResponse;
|
||||
}
|
||||
|
||||
private Map<NodeId, List<ContainerId>> mapAllocs(AllocateResponse
|
||||
allocateResponse) {
|
||||
Map<NodeId, List<ContainerId>> allocs = new HashMap<>();
|
||||
for (Container c : allocateResponse.getAllocatedContainers()) {
|
||||
List<ContainerId> cIds = allocs.get(c.getNodeId());
|
||||
if (cIds == null) {
|
||||
cIds = new ArrayList<>();
|
||||
allocs.put(c.getNodeId(), cIds);
|
||||
}
|
||||
cIds.add(c.getId());
|
||||
}
|
||||
return allocs;
|
||||
}
|
||||
|
||||
}
|
|
@ -96,7 +96,7 @@ public class TestContainerLogsPage {
|
|||
healthChecker.init(conf);
|
||||
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
|
||||
NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService());
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
|
||||
// Add an application and the corresponding containers
|
||||
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(conf);
|
||||
String user = "nobody";
|
||||
|
@ -136,7 +136,7 @@ public class TestContainerLogsPage {
|
|||
when(dirsHandlerForFullDisk.getLogDirsForRead()).
|
||||
thenReturn(Arrays.asList(new String[] {absLogDir.getAbsolutePath()}));
|
||||
nmContext = new NodeManager.NMContext(null, null, dirsHandlerForFullDisk,
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService());
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
|
||||
nmContext.getApplications().put(appId, app);
|
||||
container.setState(ContainerState.RUNNING);
|
||||
nmContext.getContainers().put(container1, container);
|
||||
|
@ -158,7 +158,7 @@ public class TestContainerLogsPage {
|
|||
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
||||
dirsHandler.init(conf);
|
||||
NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService());
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
|
||||
// Add an application and the corresponding containers
|
||||
String user = "nobody";
|
||||
long clusterTimeStamp = 1234;
|
||||
|
|
|
@ -62,7 +62,8 @@ public class TestNMAppsPage {
|
|||
Configuration conf = new Configuration();
|
||||
final NMContext nmcontext = new NMContext(
|
||||
new NMContainerTokenSecretManager(conf), new NMTokenSecretManagerInNM(),
|
||||
null, new ApplicationACLsManager(conf), new NMNullStateStoreService());
|
||||
null, new ApplicationACLsManager(conf), new NMNullStateStoreService(),
|
||||
false);
|
||||
Injector injector = WebAppTests.createMockInjector(NMContext.class,
|
||||
nmcontext, new Module() {
|
||||
@Override
|
||||
|
|
|
@ -87,7 +87,7 @@ public class TestNMWebServer {
|
|||
|
||||
private int startNMWebAppServer(String webAddr) {
|
||||
Context nmContext = new NodeManager.NMContext(null, null, null, null,
|
||||
null);
|
||||
null, false);
|
||||
ResourceView resourceView = new ResourceView() {
|
||||
@Override
|
||||
public long getVmemAllocatedForContainers() {
|
||||
|
@ -150,7 +150,7 @@ public class TestNMWebServer {
|
|||
@Test
|
||||
public void testNMWebApp() throws IOException, YarnException {
|
||||
Context nmContext = new NodeManager.NMContext(null, null, null, null,
|
||||
null);
|
||||
null, false);
|
||||
ResourceView resourceView = new ResourceView() {
|
||||
@Override
|
||||
public long getVmemAllocatedForContainers() {
|
||||
|
|
|
@ -110,7 +110,7 @@ public class TestNMWebServices extends JerseyTestBase {
|
|||
healthChecker.init(conf);
|
||||
aclsManager = new ApplicationACLsManager(conf);
|
||||
nmContext = new NodeManager.NMContext(null, null, dirsHandler,
|
||||
aclsManager, null);
|
||||
aclsManager, null, false);
|
||||
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042);
|
||||
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
|
||||
resourceView = new ResourceView() {
|
||||
|
|
|
@ -104,7 +104,7 @@ public class TestNMWebServicesApps extends JerseyTestBase {
|
|||
dirsHandler = healthChecker.getDiskHandler();
|
||||
aclsManager = new ApplicationACLsManager(conf);
|
||||
nmContext = new NodeManager.NMContext(null, null, dirsHandler,
|
||||
aclsManager, null);
|
||||
aclsManager, null, false);
|
||||
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999);
|
||||
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
|
||||
resourceView = new ResourceView() {
|
||||
|
|
|
@ -136,7 +136,7 @@ public class TestNMWebServicesContainers extends JerseyTestBase {
|
|||
dirsHandler = healthChecker.getDiskHandler();
|
||||
aclsManager = new ApplicationACLsManager(conf);
|
||||
nmContext = new NodeManager.NMContext(null, null, dirsHandler,
|
||||
aclsManager, null) {
|
||||
aclsManager, null, false) {
|
||||
public NodeId getNodeId() {
|
||||
return NodeId.newInstance("testhost.foo.com", 8042);
|
||||
};
|
||||
|
|
|
@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
|
@ -89,6 +91,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security
|
||||
.AMRMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
|
@ -104,21 +108,27 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
|
||||
private final AMLivelinessMonitor amLivelinessMonitor;
|
||||
private YarnScheduler rScheduler;
|
||||
private InetSocketAddress masterServiceAddress;
|
||||
private Server server;
|
||||
private final RecordFactory recordFactory =
|
||||
protected InetSocketAddress masterServiceAddress;
|
||||
protected Server server;
|
||||
protected final RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
|
||||
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
|
||||
private final RMContext rmContext;
|
||||
protected final RMContext rmContext;
|
||||
|
||||
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
|
||||
super(ApplicationMasterService.class.getName());
|
||||
public ApplicationMasterService(String name, RMContext rmContext,
|
||||
YarnScheduler scheduler) {
|
||||
super(name);
|
||||
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
|
||||
this.rScheduler = scheduler;
|
||||
this.rmContext = rmContext;
|
||||
}
|
||||
|
||||
public ApplicationMasterService(RMContext rmContext,
|
||||
YarnScheduler scheduler) {
|
||||
this(ApplicationMasterService.class.getName(), rmContext, scheduler);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
masterServiceAddress = conf.getSocketAddr(
|
||||
|
@ -139,11 +149,8 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
serverConf.set(
|
||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
SaslRpcServer.AuthMethod.TOKEN.toString());
|
||||
this.server =
|
||||
rpc.getServer(ApplicationMasterProtocol.class, this, masterServiceAddress,
|
||||
serverConf, this.rmContext.getAMRMTokenSecretManager(),
|
||||
serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
|
||||
this.server = getServer(rpc, serverConf, masterServiceAddress,
|
||||
this.rmContext.getAMRMTokenSecretManager());
|
||||
|
||||
// Enable service authorization?
|
||||
if (conf.getBoolean(
|
||||
|
@ -158,7 +165,7 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
}
|
||||
refreshServiceAcls(conf, RMPolicyProvider.getInstance());
|
||||
}
|
||||
|
||||
|
||||
this.server.start();
|
||||
this.masterServiceAddress =
|
||||
conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
|
||||
|
@ -168,6 +175,14 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
super.serviceStart();
|
||||
}
|
||||
|
||||
protected Server getServer(YarnRPC rpc, Configuration serverConf,
|
||||
InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
|
||||
return rpc.getServer(ApplicationMasterProtocol.class, this, addr,
|
||||
serverConf, secretManager,
|
||||
serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
|
||||
}
|
||||
|
||||
@Private
|
||||
public InetSocketAddress getBindAddress() {
|
||||
return this.masterServiceAddress;
|
||||
|
|
|
@ -0,0 +1,162 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
|
||||
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
|
||||
import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security
|
||||
.AMRMTokenSecretManager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class DistributedSchedulingService extends ApplicationMasterService
|
||||
implements DistributedSchedulerProtocol {
|
||||
|
||||
public DistributedSchedulingService(RMContext rmContext,
|
||||
YarnScheduler scheduler) {
|
||||
super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Server getServer(YarnRPC rpc, Configuration serverConf,
|
||||
InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
|
||||
Server server = rpc.getServer(DistributedSchedulerProtocol.class, this,
|
||||
addr, serverConf, secretManager,
|
||||
serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
|
||||
// To support application running no NMs that DO NOT support
|
||||
// Dist Scheduling...
|
||||
((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
|
||||
ApplicationMasterProtocolPB.class,
|
||||
ApplicationMasterProtocolService.newReflectiveBlockingService(
|
||||
new ApplicationMasterProtocolPBServiceImpl(this)));
|
||||
return server;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster
|
||||
(RegisterApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
return super.registerApplicationMaster(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FinishApplicationMasterResponse finishApplicationMaster
|
||||
(FinishApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
return super.finishApplicationMaster(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest request) throws
|
||||
YarnException, IOException {
|
||||
return super.allocate(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DistSchedRegisterResponse
|
||||
registerApplicationMasterForDistributedScheduling(
|
||||
RegisterApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
RegisterApplicationMasterResponse response =
|
||||
registerApplicationMaster(request);
|
||||
DistSchedRegisterResponse dsResp = recordFactory
|
||||
.newRecordInstance(DistSchedRegisterResponse.class);
|
||||
dsResp.setRegisterResponse(response);
|
||||
dsResp.setMinAllocatableCapabilty(
|
||||
Resource.newInstance(
|
||||
getConfig().getInt(
|
||||
YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY,
|
||||
YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT),
|
||||
getConfig().getInt(
|
||||
YarnConfiguration.DIST_SCHEDULING_MIN_VCORES,
|
||||
YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT)
|
||||
)
|
||||
);
|
||||
dsResp.setMaxAllocatableCapabilty(
|
||||
Resource.newInstance(
|
||||
getConfig().getInt(
|
||||
YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY,
|
||||
YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT),
|
||||
getConfig().getInt(
|
||||
YarnConfiguration.DIST_SCHEDULING_MAX_VCORES,
|
||||
YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT)
|
||||
)
|
||||
);
|
||||
dsResp.setIncrAllocatableCapabilty(
|
||||
Resource.newInstance(
|
||||
getConfig().getInt(
|
||||
YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY,
|
||||
YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT),
|
||||
getConfig().getInt(
|
||||
YarnConfiguration.DIST_SCHEDULING_INCR_VCORES,
|
||||
YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT)
|
||||
)
|
||||
);
|
||||
dsResp.setContainerTokenExpiryInterval(
|
||||
getConfig().getInt(
|
||||
YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS,
|
||||
YarnConfiguration.
|
||||
DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT));
|
||||
dsResp.setContainerIdStart(
|
||||
this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
|
||||
|
||||
// Set nodes to be used for scheduling
|
||||
// TODO: The actual computation of the list will happen in YARN-4412
|
||||
// TODO: Till then, send the complete list
|
||||
dsResp.setNodesForScheduling(
|
||||
new ArrayList<>(this.rmContext.getRMNodes().keySet()));
|
||||
return dsResp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DistSchedAllocateResponse allocateForDistributedScheduling
|
||||
(AllocateRequest request) throws YarnException, IOException {
|
||||
AllocateResponse response = allocate(request);
|
||||
DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
|
||||
(DistSchedAllocateResponse.class);
|
||||
dsResp.setAllocateResponse(response);
|
||||
dsResp.setNodesForScheduling(
|
||||
new ArrayList<>(this.rmContext.getRMNodes().keySet()));
|
||||
return dsResp;
|
||||
}
|
||||
}
|
|
@ -135,6 +135,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
*/
|
||||
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
||||
|
||||
/**
|
||||
* Used for generation of various ids.
|
||||
*/
|
||||
public static final int EPOCH_BIT_SHIFT = 40;
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ResourceManager.class);
|
||||
private static long clusterTimeStamp = System.currentTimeMillis();
|
||||
|
||||
|
@ -1238,6 +1243,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
}
|
||||
|
||||
protected ApplicationMasterService createApplicationMasterService() {
|
||||
if (this.rmContext.getYarnConfiguration().getBoolean(
|
||||
YarnConfiguration.DIST_SCHEDULING_ENABLED,
|
||||
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
|
||||
return new DistributedSchedulingService(this.rmContext, scheduler);
|
||||
}
|
||||
return new ApplicationMasterService(this.rmContext, scheduler);
|
||||
}
|
||||
|
||||
|
|
|
@ -93,7 +93,8 @@ public class AppSchedulingInfo {
|
|||
this.queue = queue;
|
||||
this.user = user;
|
||||
this.activeUsersManager = activeUsersManager;
|
||||
this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT);
|
||||
this.containerIdCounter =
|
||||
new AtomicLong(epoch << EPOCH_BIT_SHIFT);
|
||||
this.appResourceUsage = appResourceUsage;
|
||||
}
|
||||
|
||||
|
|
|
@ -93,6 +93,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||
import org.apache.log4j.Level;
|
||||
|
@ -810,6 +812,21 @@ public class MockRM extends ResourceManager {
|
|||
|
||||
@Override
|
||||
protected ApplicationMasterService createApplicationMasterService() {
|
||||
if (this.rmContext.getYarnConfiguration().getBoolean(
|
||||
YarnConfiguration.DIST_SCHEDULING_ENABLED,
|
||||
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
|
||||
return new DistributedSchedulingService(getRMContext(), scheduler) {
|
||||
@Override
|
||||
protected void serviceStart() {
|
||||
// override to not start rpc handler
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() {
|
||||
// don't do anything
|
||||
}
|
||||
};
|
||||
}
|
||||
return new ApplicationMasterService(getRMContext(), scheduler) {
|
||||
@Override
|
||||
protected void serviceStart() {
|
||||
|
|
|
@ -0,0 +1,170 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
|
||||
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
|
||||
.AMLivelinessMonitor;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Arrays;
|
||||
|
||||
public class TestDistributedSchedulingService {
|
||||
|
||||
// Test if the DistributedSchedulingService can handle both DSProtocol as
|
||||
// well as AMProtocol clients
|
||||
@Test
|
||||
public void testRPCWrapping() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
|
||||
.getName());
|
||||
YarnRPC rpc = YarnRPC.create(conf);
|
||||
String bindAddr = "localhost:0";
|
||||
InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
|
||||
conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr);
|
||||
final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
|
||||
final RMContext rmContext = new RMContextImpl() {
|
||||
@Override
|
||||
public AMLivelinessMonitor getAMLivelinessMonitor() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
DistributedSchedulingService service =
|
||||
new DistributedSchedulingService(rmContext, null) {
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster
|
||||
(RegisterApplicationMasterRequest request) throws
|
||||
YarnException, IOException {
|
||||
RegisterApplicationMasterResponse resp = factory.newRecordInstance(
|
||||
RegisterApplicationMasterResponse.class);
|
||||
// Dummy Entry to Assert that we get this object back
|
||||
resp.setQueue("dummyQueue");
|
||||
return resp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FinishApplicationMasterResponse finishApplicationMaster
|
||||
(FinishApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
FinishApplicationMasterResponse resp = factory.newRecordInstance(
|
||||
FinishApplicationMasterResponse.class);
|
||||
// Dummy Entry to Assert that we get this object back
|
||||
resp.setIsUnregistered(false);
|
||||
return resp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest request) throws
|
||||
YarnException, IOException {
|
||||
AllocateResponse response = factory.newRecordInstance
|
||||
(AllocateResponse.class);
|
||||
response.setNumClusterNodes(12345);
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DistSchedRegisterResponse
|
||||
registerApplicationMasterForDistributedScheduling
|
||||
(RegisterApplicationMasterRequest request) throws
|
||||
YarnException, IOException {
|
||||
DistSchedRegisterResponse resp = factory.newRecordInstance(
|
||||
DistSchedRegisterResponse.class);
|
||||
resp.setContainerIdStart(54321l);
|
||||
return resp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DistSchedAllocateResponse allocateForDistributedScheduling
|
||||
(AllocateRequest request) throws YarnException, IOException {
|
||||
DistSchedAllocateResponse resp =
|
||||
factory.newRecordInstance(DistSchedAllocateResponse.class);
|
||||
resp.setNodesForScheduling(
|
||||
Arrays.asList(NodeId.newInstance("h1", 1234)));
|
||||
return resp;
|
||||
}
|
||||
};
|
||||
Server server = service.getServer(rpc, conf, addr, null);
|
||||
server.start();
|
||||
|
||||
// Verify that the DistrubutedSchedulingService can handle vanilla
|
||||
// ApplicationMasterProtocol clients
|
||||
RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
ApplicationMasterProtocol ampProxy =
|
||||
(ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol
|
||||
.class, NetUtils.getConnectAddress(server), conf);
|
||||
RegisterApplicationMasterResponse regResp = ampProxy.registerApplicationMaster(
|
||||
factory.newRecordInstance(RegisterApplicationMasterRequest.class));
|
||||
Assert.assertEquals("dummyQueue", regResp.getQueue());
|
||||
FinishApplicationMasterResponse finishResp = ampProxy
|
||||
.finishApplicationMaster(factory.newRecordInstance(
|
||||
FinishApplicationMasterRequest.class));
|
||||
Assert.assertEquals(false, finishResp.getIsUnregistered());
|
||||
AllocateResponse allocResp = ampProxy
|
||||
.allocate(factory.newRecordInstance(AllocateRequest.class));
|
||||
Assert.assertEquals(12345, allocResp.getNumClusterNodes());
|
||||
|
||||
|
||||
// Verify that the DistrubutedSchedulingService can handle the
|
||||
// DistributedSchedulerProtocol clients as well
|
||||
RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
DistributedSchedulerProtocol dsProxy =
|
||||
(DistributedSchedulerProtocol) rpc.getProxy(DistributedSchedulerProtocol
|
||||
.class, NetUtils.getConnectAddress(server), conf);
|
||||
|
||||
DistSchedRegisterResponse dsRegResp =
|
||||
dsProxy.registerApplicationMasterForDistributedScheduling(
|
||||
factory.newRecordInstance(RegisterApplicationMasterRequest.class));
|
||||
Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
|
||||
DistSchedAllocateResponse dsAllocResp =
|
||||
dsProxy.allocateForDistributedScheduling(
|
||||
factory.newRecordInstance(AllocateRequest.class));
|
||||
Assert.assertEquals(
|
||||
"h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue