YARN-1897. CLI and core support for signal container functionality. Contributed by Ming Ma
(cherry picked from commit 8f08532bde
)
This commit is contained in:
parent
3e3733437f
commit
53bddc410f
|
@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||||
|
@ -473,4 +474,10 @@ public class ResourceMgrDelegate extends YarnClient {
|
||||||
Priority priority) throws YarnException, IOException {
|
Priority priority) throws YarnException, IOException {
|
||||||
client.updateApplicationPriority(applicationId, priority);
|
client.updateApplicationPriority(applicationId, priority);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void signalContainer(ContainerId containerId, SignalContainerCommand command)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
client.signalContainer(containerId, command);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,6 +112,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||||
|
@ -453,6 +455,12 @@ public class TestClientRedirect {
|
||||||
IOException {
|
IOException {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SignalContainerResponse signalContainer(
|
||||||
|
SignalContainerRequest request) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class HistoryService extends AMService implements HSClientProtocol {
|
class HistoryService extends AMService implements HSClientProtocol {
|
||||||
|
|
|
@ -167,6 +167,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-1651. CapacityScheduler side changes to support container resize.
|
YARN-1651. CapacityScheduler side changes to support container resize.
|
||||||
(Wangda Tan via jianhe)
|
(Wangda Tan via jianhe)
|
||||||
|
|
||||||
|
YARN-1897. CLI and core support for signal container functionality.
|
||||||
|
(Ming Ma via xgong)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
|
|
@ -53,6 +53,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -437,4 +439,32 @@ public interface ApplicationClientProtocol extends ApplicationBaseProtocol {
|
||||||
public UpdateApplicationPriorityResponse updateApplicationPriority(
|
public UpdateApplicationPriorityResponse updateApplicationPriority(
|
||||||
UpdateApplicationPriorityRequest request) throws YarnException,
|
UpdateApplicationPriorityRequest request) throws YarnException,
|
||||||
IOException;
|
IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>The interface used by clients to request the
|
||||||
|
* <code>ResourceManager</code> to signal a container. For example,
|
||||||
|
* the client can send command OUTPUT_THREAD_DUMP to dump threads of the
|
||||||
|
* container.</p>
|
||||||
|
*
|
||||||
|
* <p>The client, via {@link SignalContainerRequest} provides the
|
||||||
|
* id of the container and the signal command. </p>
|
||||||
|
*
|
||||||
|
* <p> In secure mode,the <code>ResourceManager</code> verifies access to the
|
||||||
|
* application before signaling the container.
|
||||||
|
* The user needs to have <code>MODIFY_APP</code> permission.</p>
|
||||||
|
*
|
||||||
|
* <p>Currently, the <code>ResourceManager</code> returns an empty response
|
||||||
|
* on success and throws an exception on rejecting the request.</p>
|
||||||
|
*
|
||||||
|
* @param request request to signal a container
|
||||||
|
* @return <code>ResourceManager</code> returns an empty response
|
||||||
|
* on success and throws an exception on rejecting the request
|
||||||
|
* @throws YarnException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public SignalContainerResponse signalContainer(
|
||||||
|
SignalContainerRequest request) throws YarnException,
|
||||||
|
IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.protocolrecords;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>The request sent by the client to the <code>ResourceManager</code>
|
||||||
|
* or by the <code>ApplicationMaster</code> to the <code>NodeManager</code>
|
||||||
|
* to signal a container.
|
||||||
|
* @see SignalContainerCommand </p>
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Evolving
|
||||||
|
public abstract class SignalContainerRequest {
|
||||||
|
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public static SignalContainerRequest newInstance(ContainerId containerId,
|
||||||
|
SignalContainerCommand signalContainerCommand) {
|
||||||
|
SignalContainerRequest request =
|
||||||
|
Records.newRecord(SignalContainerRequest.class);
|
||||||
|
request.setContainerId(containerId);
|
||||||
|
request.setCommand(signalContainerCommand);
|
||||||
|
return request;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the <code>ContainerId</code> of the container to signal.
|
||||||
|
* @return <code>ContainerId</code> of the container to signal.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public abstract ContainerId getContainerId();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the <code>ContainerId</code> of the container to signal.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public abstract void setContainerId(ContainerId containerId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the <code>SignalContainerCommand</code> of the signal request.
|
||||||
|
* @return <code>SignalContainerCommand</code> of the signal request.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public abstract SignalContainerCommand getCommand();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the <code>SignalContainerCommand</code> of the signal request.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public abstract void setCommand(SignalContainerCommand command);
|
||||||
|
}
|
|
@ -0,0 +1,36 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.protocolrecords;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>The response sent by the <code>ResourceManager</code> to the client
|
||||||
|
* signalling a container.</p>
|
||||||
|
*
|
||||||
|
* <p>Currently it's empty.</p>
|
||||||
|
*
|
||||||
|
* @see ApplicationClientProtocol#signalContainer(SignalContainerRequest)
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Evolving
|
||||||
|
public abstract class SignalContainerResponse {
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.records;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enumeration of various signal container commands.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Evolving
|
||||||
|
public enum SignalContainerCommand {
|
||||||
|
/**
|
||||||
|
* Used to capture thread dump.
|
||||||
|
* On Linux, it is equivalent to SIGQUIT.
|
||||||
|
*/
|
||||||
|
OUTPUT_THREAD_DUMP,
|
||||||
|
|
||||||
|
/** Gracefully shutdown a container.
|
||||||
|
* On Linux, it is equivalent to SIGTERM.
|
||||||
|
*/
|
||||||
|
GRACEFUL_SHUTDOWN,
|
||||||
|
|
||||||
|
/** Forcefully shutdown a container.
|
||||||
|
* On Linux, it is equivalent to SIGKILL.
|
||||||
|
*/
|
||||||
|
FORCEFUL_SHUTDOWN,
|
||||||
|
}
|
|
@ -56,4 +56,5 @@ service ApplicationClientProtocolService {
|
||||||
rpc getLabelsToNodes (GetLabelsToNodesRequestProto) returns (GetLabelsToNodesResponseProto);
|
rpc getLabelsToNodes (GetLabelsToNodesRequestProto) returns (GetLabelsToNodesResponseProto);
|
||||||
rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto);
|
rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto);
|
||||||
rpc updateApplicationPriority (UpdateApplicationPriorityRequestProto) returns (UpdateApplicationPriorityResponseProto);
|
rpc updateApplicationPriority (UpdateApplicationPriorityRequestProto) returns (UpdateApplicationPriorityResponseProto);
|
||||||
|
rpc signalContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto);
|
||||||
}
|
}
|
||||||
|
|
|
@ -417,6 +417,13 @@ message QueueUserACLInfoProto {
|
||||||
repeated QueueACLProto userAcls = 2;
|
repeated QueueACLProto userAcls = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum SignalContainerCommandProto {
|
||||||
|
OUTPUT_THREAD_DUMP = 1;
|
||||||
|
GRACEFUL_SHUTDOWN = 2;
|
||||||
|
FORCEFUL_SHUTDOWN = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////
|
||||||
////// From reservation_protocol /////////////////////////////////////
|
////// From reservation_protocol /////////////////////////////////////
|
||||||
////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////
|
||||||
|
|
|
@ -225,6 +225,14 @@ message UpdateApplicationPriorityRequestProto {
|
||||||
message UpdateApplicationPriorityResponseProto {
|
message UpdateApplicationPriorityResponseProto {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message SignalContainerRequestProto {
|
||||||
|
required ContainerIdProto container_id = 1;
|
||||||
|
required SignalContainerCommandProto command = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SignalContainerResponseProto {
|
||||||
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////
|
||||||
/////// client_NM_Protocol ///////////////////////////
|
/////// client_NM_Protocol ///////////////////////////
|
||||||
//////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////
|
||||||
|
|
|
@ -38,8 +38,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -56,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||||
|
@ -683,4 +682,18 @@ public abstract class YarnClient extends AbstractService {
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract void updateApplicationPriority(ApplicationId applicationId,
|
public abstract void updateApplicationPriority(ApplicationId applicationId,
|
||||||
Priority priority) throws YarnException, IOException;
|
Priority priority) throws YarnException, IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Signal a container identified by given ID.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param containerId
|
||||||
|
* {@link ContainerId} of the container that needs to be signaled
|
||||||
|
* @param command the signal container command
|
||||||
|
* @throws YarnException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public abstract void signalContainer(ContainerId containerId,
|
||||||
|
SignalContainerCommand command) throws YarnException, IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,9 +76,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -94,6 +94,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||||
|
@ -826,4 +827,14 @@ public class YarnClientImpl extends YarnClient {
|
||||||
UpdateApplicationPriorityRequest.newInstance(applicationId, priority);
|
UpdateApplicationPriorityRequest.newInstance(applicationId, priority);
|
||||||
rmClient.updateApplicationPriority(request);
|
rmClient.updateApplicationPriority(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void signalContainer(ContainerId containerId,
|
||||||
|
SignalContainerCommand command)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
LOG.info("Signalling container " + containerId + " with command " + command);
|
||||||
|
SignalContainerRequest request =
|
||||||
|
SignalContainerRequest.newInstance(containerId, command);
|
||||||
|
rmClient.signalContainer(request);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,8 +42,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||||
|
@ -148,6 +150,12 @@ public class ApplicationCLI extends YarnCLI {
|
||||||
opts.addOption(HELP_CMD, false, "Displays help for all commands.");
|
opts.addOption(HELP_CMD, false, "Displays help for all commands.");
|
||||||
opts.getOption(STATUS_CMD).setArgName("Container ID");
|
opts.getOption(STATUS_CMD).setArgName("Container ID");
|
||||||
opts.getOption(LIST_CMD).setArgName("Application Attempt ID");
|
opts.getOption(LIST_CMD).setArgName("Application Attempt ID");
|
||||||
|
opts.addOption(SIGNAL_CMD, true,
|
||||||
|
"Signal the container. The available signal commands are " +
|
||||||
|
java.util.Arrays.asList(SignalContainerCommand.values()) +
|
||||||
|
" Default command is OUTPUT_THREAD_DUMP.");
|
||||||
|
opts.getOption(SIGNAL_CMD).setArgName("container ID [signal command]");
|
||||||
|
opts.getOption(SIGNAL_CMD).setArgs(3);
|
||||||
}
|
}
|
||||||
|
|
||||||
int exitCode = -1;
|
int exitCode = -1;
|
||||||
|
@ -254,6 +262,19 @@ public class ApplicationCLI extends YarnCLI {
|
||||||
}
|
}
|
||||||
updateApplicationPriority(cliParser.getOptionValue(APP_ID),
|
updateApplicationPriority(cliParser.getOptionValue(APP_ID),
|
||||||
cliParser.getOptionValue(UPDATE_PRIORITY));
|
cliParser.getOptionValue(UPDATE_PRIORITY));
|
||||||
|
} else if (cliParser.hasOption(SIGNAL_CMD)) {
|
||||||
|
if (args.length < 3 || args.length > 4) {
|
||||||
|
printUsage(title, opts);
|
||||||
|
return exitCode;
|
||||||
|
}
|
||||||
|
final String[] signalArgs = cliParser.getOptionValues(SIGNAL_CMD);
|
||||||
|
final String containerId = signalArgs[0];
|
||||||
|
SignalContainerCommand command =
|
||||||
|
SignalContainerCommand.OUTPUT_THREAD_DUMP;
|
||||||
|
if (signalArgs.length == 2) {
|
||||||
|
command = SignalContainerCommand.valueOf(signalArgs[1]);
|
||||||
|
}
|
||||||
|
signalContainer(containerId, command);
|
||||||
} else {
|
} else {
|
||||||
syserr.println("Invalid Command Usage : ");
|
syserr.println("Invalid Command Usage : ");
|
||||||
printUsage(title, opts);
|
printUsage(title, opts);
|
||||||
|
@ -261,6 +282,20 @@ public class ApplicationCLI extends YarnCLI {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Signals the containerId
|
||||||
|
*
|
||||||
|
* @param containerIdStr the container id
|
||||||
|
* @param command the signal command
|
||||||
|
* @throws YarnException
|
||||||
|
*/
|
||||||
|
private void signalContainer(String containerIdStr,
|
||||||
|
SignalContainerCommand command) throws YarnException, IOException {
|
||||||
|
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
|
||||||
|
sysout.println("Signalling container " + containerIdStr);
|
||||||
|
client.signalContainer(containerId, command);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* It prints the usage of the command
|
* It prints the usage of the command
|
||||||
*
|
*
|
||||||
|
|
|
@ -35,6 +35,7 @@ public abstract class YarnCLI extends Configured implements Tool {
|
||||||
public static final String KILL_CMD = "kill";
|
public static final String KILL_CMD = "kill";
|
||||||
public static final String MOVE_TO_QUEUE_CMD = "movetoqueue";
|
public static final String MOVE_TO_QUEUE_CMD = "movetoqueue";
|
||||||
public static final String HELP_CMD = "help";
|
public static final String HELP_CMD = "help";
|
||||||
|
public static final String SIGNAL_CMD = "signal";
|
||||||
protected PrintStream sysout;
|
protected PrintStream sysout;
|
||||||
protected PrintStream syserr;
|
protected PrintStream syserr;
|
||||||
protected YarnClient client;
|
protected YarnClient client;
|
||||||
|
|
|
@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||||
|
@ -97,6 +98,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
|
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationRequests;
|
import org.apache.hadoop.yarn.api.records.ReservationRequests;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.client.api.AHSClient;
|
import org.apache.hadoop.yarn.client.api.AHSClient;
|
||||||
|
@ -125,6 +127,7 @@ import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
|
||||||
public class TestYarnClient {
|
public class TestYarnClient {
|
||||||
|
|
||||||
|
@ -1299,4 +1302,26 @@ public class TestYarnClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSignalContainer() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
@SuppressWarnings("resource")
|
||||||
|
final YarnClient client = new MockYarnClient();
|
||||||
|
client.init(conf);
|
||||||
|
client.start();
|
||||||
|
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
|
||||||
|
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
|
||||||
|
applicationId, 1);
|
||||||
|
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
|
||||||
|
SignalContainerCommand command = SignalContainerCommand.OUTPUT_THREAD_DUMP;
|
||||||
|
client.signalContainer(containerId, command);
|
||||||
|
final ArgumentCaptor<SignalContainerRequest> signalReqCaptor =
|
||||||
|
ArgumentCaptor.forClass(SignalContainerRequest.class);
|
||||||
|
verify(((MockYarnClient) client).getRMClient())
|
||||||
|
.signalContainer(signalReqCaptor.capture());
|
||||||
|
SignalContainerRequest request = signalReqCaptor.getValue();
|
||||||
|
Assert.assertEquals(containerId, request.getContainerId());
|
||||||
|
Assert.assertEquals(command, request.getCommand());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.text.DateFormat;
|
import java.text.DateFormat;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -40,6 +41,7 @@ import java.util.EnumSet;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.commons.cli.Options;
|
import org.apache.commons.cli.Options;
|
||||||
import org.apache.commons.lang.time.DateFormatUtils;
|
import org.apache.commons.lang.time.DateFormatUtils;
|
||||||
|
@ -61,6 +63,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||||
|
@ -82,6 +85,8 @@ public class TestYarnCLI {
|
||||||
private PrintStream sysOut;
|
private PrintStream sysOut;
|
||||||
ByteArrayOutputStream sysErrStream;
|
ByteArrayOutputStream sysErrStream;
|
||||||
private PrintStream sysErr;
|
private PrintStream sysErr;
|
||||||
|
private static final Pattern SPACES_PATTERN =
|
||||||
|
Pattern.compile("\\s+|\\n+|\\t+");
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
|
@ -785,7 +790,7 @@ public class TestYarnCLI {
|
||||||
Assert.assertTrue(result == 0);
|
Assert.assertTrue(result == 0);
|
||||||
verify(spyCli).printUsage(any(String.class), any(Options.class));
|
verify(spyCli).printUsage(any(String.class), any(Options.class));
|
||||||
Assert.assertEquals(createContainerCLIHelpMessage(),
|
Assert.assertEquals(createContainerCLIHelpMessage(),
|
||||||
sysOutStream.toString());
|
normalize(sysOutStream.toString()));
|
||||||
|
|
||||||
sysOutStream.reset();
|
sysOutStream.reset();
|
||||||
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
|
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
|
||||||
|
@ -795,7 +800,7 @@ public class TestYarnCLI {
|
||||||
new String[] {"container", "-list", appAttemptId.toString(), "args" });
|
new String[] {"container", "-list", appAttemptId.toString(), "args" });
|
||||||
verify(spyCli).printUsage(any(String.class), any(Options.class));
|
verify(spyCli).printUsage(any(String.class), any(Options.class));
|
||||||
Assert.assertEquals(createContainerCLIHelpMessage(),
|
Assert.assertEquals(createContainerCLIHelpMessage(),
|
||||||
sysOutStream.toString());
|
normalize(sysOutStream.toString()));
|
||||||
|
|
||||||
sysOutStream.reset();
|
sysOutStream.reset();
|
||||||
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 7);
|
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 7);
|
||||||
|
@ -803,7 +808,7 @@ public class TestYarnCLI {
|
||||||
new String[] { "container", "-status", containerId.toString(), "args" });
|
new String[] { "container", "-status", containerId.toString(), "args" });
|
||||||
verify(spyCli).printUsage(any(String.class), any(Options.class));
|
verify(spyCli).printUsage(any(String.class), any(Options.class));
|
||||||
Assert.assertEquals(createContainerCLIHelpMessage(),
|
Assert.assertEquals(createContainerCLIHelpMessage(),
|
||||||
sysOutStream.toString());
|
normalize(sysOutStream.toString()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 5000)
|
@Test (timeout = 5000)
|
||||||
|
@ -1256,8 +1261,8 @@ public class TestYarnCLI {
|
||||||
sysOutStream.reset();
|
sysOutStream.reset();
|
||||||
result = cli.run(new String[] { "container", "-status" });
|
result = cli.run(new String[] { "container", "-status" });
|
||||||
Assert.assertEquals(result, -1);
|
Assert.assertEquals(result, -1);
|
||||||
Assert.assertEquals(String.format("Missing argument for options%n%1s",
|
Assert.assertEquals(String.format("Missing argument for options %1s",
|
||||||
createContainerCLIHelpMessage()), sysOutStream.toString());
|
createContainerCLIHelpMessage()), normalize(sysOutStream.toString()));
|
||||||
|
|
||||||
sysOutStream.reset();
|
sysOutStream.reset();
|
||||||
NodeCLI nodeCLI = new NodeCLI();
|
NodeCLI nodeCLI = new NodeCLI();
|
||||||
|
@ -1538,10 +1543,17 @@ public class TestYarnCLI {
|
||||||
pw.println("usage: container");
|
pw.println("usage: container");
|
||||||
pw.println(" -help Displays help for all commands.");
|
pw.println(" -help Displays help for all commands.");
|
||||||
pw.println(" -list <Application Attempt ID> List containers for application attempt.");
|
pw.println(" -list <Application Attempt ID> List containers for application attempt.");
|
||||||
|
pw.println(" -signal <container ID [signal command]> Signal the container.");
|
||||||
|
pw.println("The available signal commands are ");
|
||||||
|
pw.println(java.util.Arrays.asList(SignalContainerCommand.values()));
|
||||||
|
pw.println(" Default command is OUTPUT_THREAD_DUMP.");
|
||||||
pw.println(" -status <Container ID> Prints the status of the container.");
|
pw.println(" -status <Container ID> Prints the status of the container.");
|
||||||
pw.close();
|
pw.close();
|
||||||
String appsHelpStr = baos.toString("UTF-8");
|
try {
|
||||||
return appsHelpStr;
|
return normalize(baos.toString("UTF-8"));
|
||||||
|
} catch (UnsupportedEncodingException infeasible) {
|
||||||
|
return infeasible.toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String createNodeCLIHelpMessage() throws IOException {
|
private String createNodeCLIHelpMessage() throws IOException {
|
||||||
|
@ -1560,4 +1572,8 @@ public class TestYarnCLI {
|
||||||
String nodesHelpStr = baos.toString("UTF-8");
|
String nodesHelpStr = baos.toString("UTF-8");
|
||||||
return nodesHelpStr;
|
return nodesHelpStr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String normalize(String s) {
|
||||||
|
return SPACES_PATTERN.matcher(s).replaceAll(" "); // single space
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,6 +77,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
|
||||||
|
@ -125,6 +127,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateReque
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
@ -527,4 +531,18 @@ public class ApplicationClientProtocolPBClientImpl implements ApplicationClientP
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SignalContainerResponse signalContainer(
|
||||||
|
SignalContainerRequest request) throws YarnException, IOException {
|
||||||
|
YarnServiceProtos.SignalContainerRequestProto requestProto =
|
||||||
|
((SignalContainerRequestPBImpl) request).getProto();
|
||||||
|
try {
|
||||||
|
return new SignalContainerResponsePBImpl(
|
||||||
|
proxy.signalContainer(null, requestProto));
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
RPCUtil.unwrapAndThrowException(e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptReportRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptReportRequestPBImpl;
|
||||||
|
@ -102,6 +103,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPrior
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportResponseProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptsRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptsRequestProto;
|
||||||
|
@ -140,8 +142,11 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionReque
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionResponseProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateResponseProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
|
||||||
|
|
||||||
|
@ -529,4 +534,18 @@ public class ApplicationClientProtocolPBServiceImpl implements ApplicationClient
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SignalContainerResponseProto signalContainer(RpcController controller,
|
||||||
|
YarnServiceProtos.SignalContainerRequestProto proto) throws ServiceException {
|
||||||
|
SignalContainerRequestPBImpl request = new SignalContainerRequestPBImpl(proto);
|
||||||
|
try {
|
||||||
|
SignalContainerResponse response = real.signalContainer(request);
|
||||||
|
return ((SignalContainerResponsePBImpl)response).getProto();
|
||||||
|
} catch (YarnException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,169 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
||||||
|
|
||||||
|
|
||||||
|
import com.google.protobuf.TextFormat;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.SignalContainerCommandProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProtoOrBuilder;
|
||||||
|
|
||||||
|
|
||||||
|
public class SignalContainerRequestPBImpl
|
||||||
|
extends SignalContainerRequest {
|
||||||
|
SignalContainerRequestProto proto =
|
||||||
|
SignalContainerRequestProto.getDefaultInstance();
|
||||||
|
SignalContainerRequestProto.Builder builder = null;
|
||||||
|
boolean viaProto = false;
|
||||||
|
|
||||||
|
private ContainerId containerId;
|
||||||
|
private SignalContainerCommand command = null;
|
||||||
|
|
||||||
|
private static SignalContainerCommand convertFromProtoFormat(
|
||||||
|
SignalContainerCommandProto p) {
|
||||||
|
return SignalContainerCommand.valueOf(p.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SignalContainerCommandProto convertToProtoFormat(
|
||||||
|
SignalContainerCommand p) {
|
||||||
|
return SignalContainerCommandProto.valueOf(p.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
public SignalContainerRequestPBImpl() {
|
||||||
|
builder = SignalContainerRequestProto.newBuilder();
|
||||||
|
}
|
||||||
|
|
||||||
|
public SignalContainerRequestPBImpl(SignalContainerRequestProto proto) {
|
||||||
|
this.proto = proto;
|
||||||
|
viaProto = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SignalContainerRequestProto getProto() {
|
||||||
|
mergeLocalToProto();
|
||||||
|
proto = viaProto ? proto : builder.build();
|
||||||
|
viaProto = true;
|
||||||
|
return proto;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mergeLocalToBuilder() {
|
||||||
|
if (this.containerId != null) {
|
||||||
|
builder.setContainerId(convertToProtoFormat(this.containerId));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.command != null) {
|
||||||
|
builder.setCommand(convertToProtoFormat(this.command));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mergeLocalToProto() {
|
||||||
|
if (viaProto)
|
||||||
|
maybeInitBuilder();
|
||||||
|
mergeLocalToBuilder();
|
||||||
|
proto = builder.build();
|
||||||
|
viaProto = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void maybeInitBuilder() {
|
||||||
|
if (viaProto || builder == null) {
|
||||||
|
builder = SignalContainerRequestProto.newBuilder(proto);
|
||||||
|
}
|
||||||
|
viaProto = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return getProto().hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object other) {
|
||||||
|
if (other == null)
|
||||||
|
return false;
|
||||||
|
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||||
|
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return TextFormat.shortDebugString(getProto());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ContainerId getContainerId() {
|
||||||
|
SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (this.containerId != null) {
|
||||||
|
return this.containerId;
|
||||||
|
}
|
||||||
|
if (!p.hasContainerId()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
this.containerId = convertFromProtoFormat(p.getContainerId());
|
||||||
|
return this.containerId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setContainerId(ContainerId containerId) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (containerId == null) {
|
||||||
|
builder.clearContainerId();
|
||||||
|
}
|
||||||
|
this.containerId = containerId;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initCommand() {
|
||||||
|
if (this.command != null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if(p.hasCommand()) {
|
||||||
|
this.command = convertFromProtoFormat(p.getCommand());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SignalContainerCommand getCommand() {
|
||||||
|
initCommand();
|
||||||
|
return command;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setCommand(SignalContainerCommand command) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (command == null) {
|
||||||
|
builder.clearCommand();
|
||||||
|
}
|
||||||
|
this.command = command;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
|
||||||
|
return new ContainerIdPBImpl(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ContainerIdProto convertToProtoFormat(ContainerId t) {
|
||||||
|
return ((ContainerIdPBImpl)t).getProto();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,44 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto;
|
||||||
|
|
||||||
|
public class SignalContainerResponsePBImpl extends SignalContainerResponse {
|
||||||
|
SignalContainerResponseProto proto = SignalContainerResponseProto.getDefaultInstance();
|
||||||
|
SignalContainerResponseProto.Builder builder = null;
|
||||||
|
boolean viaProto = false;
|
||||||
|
|
||||||
|
public SignalContainerResponsePBImpl() {
|
||||||
|
builder = SignalContainerResponseProto.newBuilder();
|
||||||
|
}
|
||||||
|
|
||||||
|
public SignalContainerResponsePBImpl(SignalContainerResponseProto proto) {
|
||||||
|
this.proto = proto;
|
||||||
|
viaProto = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SignalContainerResponseProto getProto() {
|
||||||
|
proto = viaProto ? proto : builder.build();
|
||||||
|
viaProto = true;
|
||||||
|
return proto;
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,6 +23,7 @@ import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
@ -56,6 +57,8 @@ public interface NodeHeartbeatResponse {
|
||||||
|
|
||||||
void addAllApplicationsToCleanup(List<ApplicationId> applications);
|
void addAllApplicationsToCleanup(List<ApplicationId> applications);
|
||||||
|
|
||||||
|
List<SignalContainerRequest> getContainersToSignalList();
|
||||||
|
void addAllContainersToSignal(List<SignalContainerRequest> containers);
|
||||||
long getNextHeartBeatInterval();
|
long getNextHeartBeatInterval();
|
||||||
void setNextHeartBeatInterval(long nextHeartBeatInterval);
|
void setNextHeartBeatInterval(long nextHeartBeatInterval);
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,8 @@ import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
@ -37,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
|
||||||
|
@ -62,8 +65,8 @@ public class NodeHeartbeatResponsePBImpl extends
|
||||||
|
|
||||||
private MasterKey containerTokenMasterKey = null;
|
private MasterKey containerTokenMasterKey = null;
|
||||||
private MasterKey nmTokenMasterKey = null;
|
private MasterKey nmTokenMasterKey = null;
|
||||||
|
|
||||||
private List<Container> containersToDecrease = null;
|
private List<Container> containersToDecrease = null;
|
||||||
|
private List<SignalContainerRequest> containersToSignal = null;
|
||||||
|
|
||||||
public NodeHeartbeatResponsePBImpl() {
|
public NodeHeartbeatResponsePBImpl() {
|
||||||
builder = NodeHeartbeatResponseProto.newBuilder();
|
builder = NodeHeartbeatResponseProto.newBuilder();
|
||||||
|
@ -105,6 +108,9 @@ public class NodeHeartbeatResponsePBImpl extends
|
||||||
if (this.containersToDecrease != null) {
|
if (this.containersToDecrease != null) {
|
||||||
addContainersToDecreaseToProto();
|
addContainersToDecreaseToProto();
|
||||||
}
|
}
|
||||||
|
if (this.containersToSignal != null) {
|
||||||
|
addContainersToSignalToProto();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addSystemCredentialsToProto() {
|
private void addSystemCredentialsToProto() {
|
||||||
|
@ -571,5 +577,75 @@ public class NodeHeartbeatResponsePBImpl extends
|
||||||
maybeInitBuilder();
|
maybeInitBuilder();
|
||||||
this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
|
this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<SignalContainerRequest> getContainersToSignalList() {
|
||||||
|
initContainersToSignal();
|
||||||
|
return this.containersToSignal;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initContainersToSignal() {
|
||||||
|
if (this.containersToSignal != null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
List<SignalContainerRequestProto> list = p.getContainersToSignalList();
|
||||||
|
this.containersToSignal = new ArrayList<SignalContainerRequest>();
|
||||||
|
|
||||||
|
for (SignalContainerRequestProto c : list) {
|
||||||
|
this.containersToSignal.add(convertFromProtoFormat(c));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addAllContainersToSignal(
|
||||||
|
final List<SignalContainerRequest> containersToSignal) {
|
||||||
|
if (containersToSignal == null)
|
||||||
|
return;
|
||||||
|
initContainersToSignal();
|
||||||
|
this.containersToSignal.addAll(containersToSignal);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addContainersToSignalToProto() {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.clearContainersToSignal();
|
||||||
|
if (containersToSignal == null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
Iterable<SignalContainerRequestProto> iterable =
|
||||||
|
new Iterable<SignalContainerRequestProto>() {
|
||||||
|
@Override
|
||||||
|
public Iterator<SignalContainerRequestProto> iterator() {
|
||||||
|
return new Iterator<SignalContainerRequestProto>() {
|
||||||
|
Iterator<SignalContainerRequest> iter = containersToSignal.iterator();
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return iter.hasNext();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SignalContainerRequestProto next() {
|
||||||
|
return convertToProtoFormat(iter.next());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
builder.addAllContainersToSignal(iterable);
|
||||||
|
}
|
||||||
|
|
||||||
|
private SignalContainerRequestPBImpl convertFromProtoFormat(
|
||||||
|
SignalContainerRequestProto p) {
|
||||||
|
return new SignalContainerRequestPBImpl(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
private SignalContainerRequestProto convertToProtoFormat(
|
||||||
|
SignalContainerRequest t) {
|
||||||
|
return ((SignalContainerRequestPBImpl)t).getProto();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ package hadoop.yarn;
|
||||||
|
|
||||||
import "yarn_protos.proto";
|
import "yarn_protos.proto";
|
||||||
import "yarn_server_common_protos.proto";
|
import "yarn_server_common_protos.proto";
|
||||||
|
import "yarn_service_protos.proto";
|
||||||
|
|
||||||
message NodeLabelsProto {
|
message NodeLabelsProto {
|
||||||
repeated NodeLabelProto nodeLabels = 1;
|
repeated NodeLabelProto nodeLabels = 1;
|
||||||
|
@ -83,6 +84,7 @@ message NodeHeartbeatResponseProto {
|
||||||
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
|
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
|
||||||
optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
|
optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
|
||||||
repeated ContainerProto containers_to_decrease = 12;
|
repeated ContainerProto containers_to_decrease = 12;
|
||||||
|
repeated SignalContainerRequestProto containers_to_signal = 13;
|
||||||
}
|
}
|
||||||
|
|
||||||
message SystemCredentialsForAppsProto {
|
message SystemCredentialsForAppsProto {
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
|
|
||||||
|
public class CMgrSignalContainersEvent extends ContainerManagerEvent {
|
||||||
|
|
||||||
|
private List<SignalContainerRequest> containerToSignal;
|
||||||
|
|
||||||
|
public CMgrSignalContainersEvent(List<SignalContainerRequest> containerToSignal) {
|
||||||
|
super(ContainerManagerEventType.SIGNAL_CONTAINERS);
|
||||||
|
this.containerToSignal = containerToSignal;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<SignalContainerRequest> getContainersToSignal() {
|
||||||
|
return this.containerToSignal;
|
||||||
|
}
|
||||||
|
}
|
|
@ -21,5 +21,6 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
public enum ContainerManagerEventType {
|
public enum ContainerManagerEventType {
|
||||||
FINISH_APPS,
|
FINISH_APPS,
|
||||||
FINISH_CONTAINERS,
|
FINISH_CONTAINERS,
|
||||||
DECREASE_CONTAINERS_RESOURCE
|
DECREASE_CONTAINERS_RESOURCE,
|
||||||
|
SIGNAL_CONTAINERS
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.VersionUtil;
|
import org.apache.hadoop.util.VersionUtil;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
|
@ -800,6 +801,16 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
new CMgrDecreaseContainersResourceEvent(containersToDecrease)
|
new CMgrDecreaseContainersResourceEvent(containersToDecrease)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SignalContainer request originally comes from end users via
|
||||||
|
// ClientRMProtocol's SignalContainer. Forward the request to
|
||||||
|
// ContainerManager which will dispatch the event to ContainerLauncher.
|
||||||
|
List<SignalContainerRequest> containersToSignal = response
|
||||||
|
.getContainersToSignalList();
|
||||||
|
if (containersToSignal.size() != 0) {
|
||||||
|
dispatcher.getEventHandler().handle(
|
||||||
|
new CMgrSignalContainersEvent(containersToSignal));
|
||||||
|
}
|
||||||
} catch (ConnectException e) {
|
} catch (ConnectException e) {
|
||||||
//catch and throw the exception if tried MAX wait time to connect RM
|
//catch and throw the exception if tried MAX wait time to connect RM
|
||||||
dispatcher.getEventHandler().handle(
|
dispatcher.getEventHandler().handle(
|
||||||
|
|
|
@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
|
||||||
|
@ -97,6 +98,7 @@ import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
|
@ -121,6 +123,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
|
||||||
|
@ -1349,6 +1352,23 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
case SIGNAL_CONTAINERS:
|
||||||
|
CMgrSignalContainersEvent containersSignalEvent =
|
||||||
|
(CMgrSignalContainersEvent) event;
|
||||||
|
for (SignalContainerRequest request : containersSignalEvent
|
||||||
|
.getContainersToSignal()) {
|
||||||
|
ContainerId containerId = request.getContainerId();
|
||||||
|
Container container = this.context.getContainers().get(containerId);
|
||||||
|
if (container != null) {
|
||||||
|
LOG.info(containerId + " signal request by ResourceManager.");
|
||||||
|
this.dispatcher.getEventHandler().handle(
|
||||||
|
new SignalContainersLauncherEvent(container,
|
||||||
|
request.getCommand()));
|
||||||
|
} else {
|
||||||
|
LOG.info("Container " + containerId + " no longer exists");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
throw new YarnRuntimeException(
|
throw new YarnRuntimeException(
|
||||||
"Got an unknown ContainerManagerEvent type: " + event.getType());
|
"Got an unknown ContainerManagerEvent type: " + event.getType());
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||||
|
@ -461,6 +462,100 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a signal to the container.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||||
|
public void signalContainer(SignalContainerCommand command)
|
||||||
|
throws IOException {
|
||||||
|
ContainerId containerId =
|
||||||
|
container.getContainerTokenIdentifier().getContainerID();
|
||||||
|
String containerIdStr = ConverterUtils.toString(containerId);
|
||||||
|
String user = container.getUser();
|
||||||
|
Signal signal = translateCommandToSignal(command);
|
||||||
|
if (signal.equals(Signal.NULL)) {
|
||||||
|
LOG.info("ignore signal command " + command);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Sending signal " + command + " to container " + containerIdStr);
|
||||||
|
|
||||||
|
boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true);
|
||||||
|
if (!alreadyLaunched) {
|
||||||
|
LOG.info("Container " + containerIdStr + " not launched."
|
||||||
|
+ " Not sending the signal");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Getting pid for container " + containerIdStr
|
||||||
|
+ " to send signal to from pid file "
|
||||||
|
+ (pidFilePath != null ? pidFilePath.toString() : "null"));
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// get process id from pid file if available
|
||||||
|
// else if shell is still active, get it from the shell
|
||||||
|
String processId = null;
|
||||||
|
if (pidFilePath != null) {
|
||||||
|
processId = getContainerPid(pidFilePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (processId != null) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Sending signal to pid " + processId
|
||||||
|
+ " as user " + user
|
||||||
|
+ " for container " + containerIdStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean result = exec.signalContainer(
|
||||||
|
new ContainerSignalContext.Builder()
|
||||||
|
.setContainer(container)
|
||||||
|
.setUser(user)
|
||||||
|
.setPid(processId)
|
||||||
|
.setSignal(signal)
|
||||||
|
.build());
|
||||||
|
|
||||||
|
String diagnostics = "Sent signal " + command
|
||||||
|
+ " (" + signal + ") to pid " + processId
|
||||||
|
+ " as user " + user
|
||||||
|
+ " for container " + containerIdStr
|
||||||
|
+ ", result=" + (result ? "success" : "failed");
|
||||||
|
LOG.info(diagnostics);
|
||||||
|
|
||||||
|
dispatcher.getEventHandler().handle(
|
||||||
|
new ContainerDiagnosticsUpdateEvent(containerId, diagnostics));
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
String message =
|
||||||
|
"Exception when sending signal to container " + containerIdStr
|
||||||
|
+ ": " + StringUtils.stringifyException(e);
|
||||||
|
LOG.warn(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static Signal translateCommandToSignal(
|
||||||
|
SignalContainerCommand command) {
|
||||||
|
Signal signal = Signal.NULL;
|
||||||
|
switch (command) {
|
||||||
|
case OUTPUT_THREAD_DUMP:
|
||||||
|
// TODO for windows support.
|
||||||
|
signal = Shell.WINDOWS ? Signal.NULL: Signal.QUIT;
|
||||||
|
break;
|
||||||
|
case GRACEFUL_SHUTDOWN:
|
||||||
|
signal = Signal.TERM;
|
||||||
|
break;
|
||||||
|
case FORCEFUL_SHUTDOWN:
|
||||||
|
signal = Signal.KILL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return signal;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Loop through for a time-bounded interval waiting to
|
* Loop through for a time-bounded interval waiting to
|
||||||
* read the process id from a file generated by a running process.
|
* read the process id from a file generated by a running process.
|
||||||
|
|
|
@ -142,7 +142,23 @@ public class ContainersLauncher extends AbstractService
|
||||||
+ ". Ignoring.");
|
+ ". Ignoring.");
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
case SIGNAL_CONTAINER:
|
||||||
|
SignalContainersLauncherEvent signalEvent =
|
||||||
|
(SignalContainersLauncherEvent) event;
|
||||||
|
ContainerLaunch runningContainer = running.get(containerId);
|
||||||
|
if (runningContainer == null) {
|
||||||
|
// Container not launched. So nothing needs to be done.
|
||||||
|
LOG.info("Container " + containerId + " not running, nothing to signal.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
runningContainer.signalContainer(signalEvent.getCommand());
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Got exception while signaling container " + containerId
|
||||||
|
+ " with command " + signalEvent.getCommand());
|
||||||
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,4 +22,5 @@ public enum ContainersLauncherEventType {
|
||||||
LAUNCH_CONTAINER,
|
LAUNCH_CONTAINER,
|
||||||
RECOVER_CONTAINER,
|
RECOVER_CONTAINER,
|
||||||
CLEANUP_CONTAINER, // The process(grp) itself.
|
CLEANUP_CONTAINER, // The process(grp) itself.
|
||||||
|
SIGNAL_CONTAINER,
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
|
||||||
|
// This event can be triggered by one of the following flows
|
||||||
|
// WebUI -> Container
|
||||||
|
// CLI -> RM -> NM
|
||||||
|
public class SignalContainersLauncherEvent extends ContainersLauncherEvent{
|
||||||
|
|
||||||
|
private final SignalContainerCommand command;
|
||||||
|
public SignalContainersLauncherEvent(Container container,
|
||||||
|
SignalContainerCommand command) {
|
||||||
|
super(container, ContainersLauncherEventType.SIGNAL_CONTAINER);
|
||||||
|
this.command = command;
|
||||||
|
}
|
||||||
|
public SignalContainerCommand getCommand() {
|
||||||
|
return command;
|
||||||
|
}
|
||||||
|
}
|
|
@ -222,6 +222,42 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
|
||||||
super.testChangeContainerResource();
|
super.testChangeContainerResource();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void testOutputThreadDumpSignal() throws IOException,
|
||||||
|
InterruptedException, YarnException {
|
||||||
|
// Don't run the test if the binary is not available.
|
||||||
|
if (!shouldRunTest()) {
|
||||||
|
LOG.info("LCE binary path is not passed. Not running the test");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.info("Running testOutputThreadDumpSignal");
|
||||||
|
super.testOutputThreadDumpSignal();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void testGracefulShutdownSignal() throws IOException,
|
||||||
|
InterruptedException, YarnException {
|
||||||
|
// Don't run the test if the binary is not available.
|
||||||
|
if (!shouldRunTest()) {
|
||||||
|
LOG.info("LCE binary path is not passed. Not running the test");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.info("Running testGracefulShutdownSignal");
|
||||||
|
super.testGracefulShutdownSignal();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void testForcefulShutdownSignal() throws IOException,
|
||||||
|
InterruptedException, YarnException {
|
||||||
|
// Don't run the test if the binary is not available.
|
||||||
|
if (!shouldRunTest()) {
|
||||||
|
LOG.info("LCE binary path is not passed. Not running the test");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.info("Running testForcefulShutdownSignal");
|
||||||
|
super.testForcefulShutdownSignal();
|
||||||
|
}
|
||||||
|
|
||||||
private boolean shouldRunTest() {
|
private boolean shouldRunTest() {
|
||||||
return System
|
return System
|
||||||
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;
|
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;
|
||||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.service.Service.STATE;
|
import org.apache.hadoop.service.Service.STATE;
|
||||||
import org.apache.hadoop.service.ServiceOperations;
|
import org.apache.hadoop.service.ServiceOperations;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
@ -69,6 +70,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.client.RMProxy;
|
import org.apache.hadoop.yarn.client.RMProxy;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -166,9 +168,11 @@ public class TestNodeStatusUpdater {
|
||||||
private class MyResourceTracker implements ResourceTracker {
|
private class MyResourceTracker implements ResourceTracker {
|
||||||
|
|
||||||
private final Context context;
|
private final Context context;
|
||||||
|
private boolean signalContainer;
|
||||||
|
|
||||||
public MyResourceTracker(Context context) {
|
public MyResourceTracker(Context context, boolean signalContainer) {
|
||||||
this.context = context;
|
this.context = context;
|
||||||
|
this.signalContainer = signalContainer;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -222,17 +226,19 @@ public class TestNodeStatusUpdater {
|
||||||
nodeStatus.setResponseId(heartBeatID++);
|
nodeStatus.setResponseId(heartBeatID++);
|
||||||
Map<ApplicationId, List<ContainerStatus>> appToContainers =
|
Map<ApplicationId, List<ContainerStatus>> appToContainers =
|
||||||
getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
|
getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
|
||||||
|
List<SignalContainerRequest> containersToSignal = null;
|
||||||
|
|
||||||
ApplicationId appId1 = ApplicationId.newInstance(0, 1);
|
ApplicationId appId1 = ApplicationId.newInstance(0, 1);
|
||||||
ApplicationId appId2 = ApplicationId.newInstance(0, 2);
|
ApplicationId appId2 = ApplicationId.newInstance(0, 2);
|
||||||
|
|
||||||
|
ContainerId firstContainerID = null;
|
||||||
if (heartBeatID == 1) {
|
if (heartBeatID == 1) {
|
||||||
Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
|
Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
|
||||||
|
|
||||||
// Give a container to the NM.
|
// Give a container to the NM.
|
||||||
ApplicationAttemptId appAttemptID =
|
ApplicationAttemptId appAttemptID =
|
||||||
ApplicationAttemptId.newInstance(appId1, 0);
|
ApplicationAttemptId.newInstance(appId1, 0);
|
||||||
ContainerId firstContainerID =
|
firstContainerID =
|
||||||
ContainerId.newContainerId(appAttemptID, heartBeatID);
|
ContainerId.newContainerId(appAttemptID, heartBeatID);
|
||||||
ContainerLaunchContext launchContext = recordFactory
|
ContainerLaunchContext launchContext = recordFactory
|
||||||
.newRecordInstance(ContainerLaunchContext.class);
|
.newRecordInstance(ContainerLaunchContext.class);
|
||||||
|
@ -259,6 +265,15 @@ public class TestNodeStatusUpdater {
|
||||||
this.context.getContainers();
|
this.context.getContainers();
|
||||||
Assert.assertEquals(1, activeContainers.size());
|
Assert.assertEquals(1, activeContainers.size());
|
||||||
|
|
||||||
|
if (this.signalContainer) {
|
||||||
|
containersToSignal = new ArrayList<SignalContainerRequest>();
|
||||||
|
SignalContainerRequest signalReq = recordFactory
|
||||||
|
.newRecordInstance(SignalContainerRequest.class);
|
||||||
|
signalReq.setContainerId(firstContainerID);
|
||||||
|
signalReq.setCommand(SignalContainerCommand.OUTPUT_THREAD_DUMP);
|
||||||
|
containersToSignal.add(signalReq);
|
||||||
|
}
|
||||||
|
|
||||||
// Give another container to the NM.
|
// Give another container to the NM.
|
||||||
ApplicationAttemptId appAttemptID =
|
ApplicationAttemptId appAttemptID =
|
||||||
ApplicationAttemptId.newInstance(appId2, 0);
|
ApplicationAttemptId.newInstance(appId2, 0);
|
||||||
|
@ -295,6 +310,9 @@ public class TestNodeStatusUpdater {
|
||||||
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
|
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
|
||||||
newNodeHeartbeatResponse(heartBeatID, null, null, null, null, null,
|
newNodeHeartbeatResponse(heartBeatID, null, null, null, null, null,
|
||||||
1000L);
|
1000L);
|
||||||
|
if (containersToSignal != null) {
|
||||||
|
nhResponse.addAllContainersToSignal(containersToSignal);
|
||||||
|
}
|
||||||
return nhResponse;
|
return nhResponse;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -306,15 +324,40 @@ public class TestNodeStatusUpdater {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class MyContainerManager extends ContainerManagerImpl {
|
||||||
|
public boolean signaled = false;
|
||||||
|
|
||||||
|
public MyContainerManager(Context context, ContainerExecutor exec,
|
||||||
|
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
|
||||||
|
NodeManagerMetrics metrics,
|
||||||
|
LocalDirsHandlerService dirsHandler) {
|
||||||
|
super(context, exec, deletionContext, nodeStatusUpdater,
|
||||||
|
metrics, dirsHandler);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(ContainerManagerEvent event) {
|
||||||
|
if (event.getType() == ContainerManagerEventType.SIGNAL_CONTAINERS) {
|
||||||
|
signaled = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
|
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
|
||||||
public ResourceTracker resourceTracker;
|
public ResourceTracker resourceTracker;
|
||||||
private Context context;
|
private Context context;
|
||||||
|
|
||||||
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
|
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
|
||||||
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
||||||
|
this(context, dispatcher, healthChecker, metrics, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
|
||||||
|
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
|
||||||
|
boolean signalContainer) {
|
||||||
super(context, dispatcher, healthChecker, metrics);
|
super(context, dispatcher, healthChecker, metrics);
|
||||||
this.context = context;
|
this.context = context;
|
||||||
resourceTracker = new MyResourceTracker(this.context);
|
resourceTracker = new MyResourceTracker(this.context, signalContainer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1547,6 +1590,66 @@ public class TestNodeStatusUpdater {
|
||||||
nm.stop();
|
nm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//Verify that signalContainer request can be dispatched from
|
||||||
|
//NodeStatusUpdaterImpl to ContainerManagerImpl.
|
||||||
|
@Test
|
||||||
|
public void testSignalContainerToContainerManager() throws Exception {
|
||||||
|
nm = new NodeManager() {
|
||||||
|
@Override
|
||||||
|
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||||
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
||||||
|
return new MyNodeStatusUpdater(
|
||||||
|
context, dispatcher, healthChecker, metrics, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ContainerManagerImpl createContainerManager(Context context,
|
||||||
|
ContainerExecutor exec, DeletionService del,
|
||||||
|
NodeStatusUpdater nodeStatusUpdater,
|
||||||
|
ApplicationACLsManager aclsManager,
|
||||||
|
LocalDirsHandlerService diskhandler) {
|
||||||
|
return new MyContainerManager(context, exec, del, nodeStatusUpdater,
|
||||||
|
metrics, diskhandler);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
YarnConfiguration conf = createNMConfig();
|
||||||
|
nm.init(conf);
|
||||||
|
nm.start();
|
||||||
|
|
||||||
|
System.out.println(" ----- thread already started.."
|
||||||
|
+ nm.getServiceState());
|
||||||
|
|
||||||
|
int waitCount = 0;
|
||||||
|
while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) {
|
||||||
|
LOG.info("Waiting for NM to start..");
|
||||||
|
if (nmStartError != null) {
|
||||||
|
LOG.error("Error during startup. ", nmStartError);
|
||||||
|
Assert.fail(nmStartError.getCause().getMessage());
|
||||||
|
}
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
if (nm.getServiceState() != STATE.STARTED) {
|
||||||
|
// NM could have failed.
|
||||||
|
Assert.fail("NodeManager failed to start");
|
||||||
|
}
|
||||||
|
|
||||||
|
waitCount = 0;
|
||||||
|
while (heartBeatID <= 3 && waitCount++ != 20) {
|
||||||
|
Thread.sleep(500);
|
||||||
|
}
|
||||||
|
Assert.assertFalse(heartBeatID <= 3);
|
||||||
|
Assert.assertEquals("Number of registered NMs is wrong!!", 1,
|
||||||
|
this.registeredNodes.size());
|
||||||
|
|
||||||
|
MyContainerManager containerManager =
|
||||||
|
(MyContainerManager)nm.getContainerManager();
|
||||||
|
Assert.assertTrue(containerManager.signaled);
|
||||||
|
|
||||||
|
nm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConcurrentAccessToSystemCredentials(){
|
public void testConcurrentAccessToSystemCredentials(){
|
||||||
final Map<ApplicationId, ByteBuffer> testCredentials = new HashMap<>();
|
final Map<ApplicationId, ByteBuffer> testCredentials = new HashMap<>();
|
||||||
|
|
|
@ -82,6 +82,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||||
|
@ -464,4 +466,10 @@ public class MockResourceManagerFacade implements
|
||||||
IOException {
|
IOException {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SignalContainerResponse signalContainer(
|
||||||
|
SignalContainerRequest request) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -73,6 +73,8 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
|
||||||
public abstract class BaseContainerManagerTest {
|
public abstract class BaseContainerManagerTest {
|
||||||
|
|
||||||
protected static RecordFactory recordFactory = RecordFactoryProvider
|
protected static RecordFactory recordFactory = RecordFactoryProvider
|
||||||
|
@ -148,7 +150,7 @@ public abstract class BaseContainerManagerTest {
|
||||||
protected ContainerExecutor createContainerExecutor() {
|
protected ContainerExecutor createContainerExecutor() {
|
||||||
DefaultContainerExecutor exec = new DefaultContainerExecutor();
|
DefaultContainerExecutor exec = new DefaultContainerExecutor();
|
||||||
exec.setConf(conf);
|
exec.setConf(conf);
|
||||||
return exec;
|
return spy(exec);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequ
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
|
||||||
|
@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.SerializedException;
|
import org.apache.hadoop.yarn.api.records.SerializedException;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.api.records.URL;
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -75,22 +77,30 @@ import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
|
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.timeout;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
public class TestContainerManager extends BaseContainerManagerTest {
|
public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
|
|
||||||
|
@ -262,7 +272,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
Assert.assertEquals(null, reader.readLine());
|
Assert.assertEquals(null, reader.readLine());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
//@Test
|
||||||
public void testContainerLaunchAndStop() throws IOException,
|
public void testContainerLaunchAndStop() throws IOException,
|
||||||
InterruptedException, YarnException {
|
InterruptedException, YarnException {
|
||||||
containerManager.start();
|
containerManager.start();
|
||||||
|
@ -1173,4 +1183,103 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
.retrievePassword(containerTokenIdentifier),
|
.retrievePassword(containerTokenIdentifier),
|
||||||
containerTokenIdentifier);
|
containerTokenIdentifier);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOutputThreadDumpSignal() throws IOException,
|
||||||
|
InterruptedException, YarnException {
|
||||||
|
testContainerLaunchAndSignal(SignalContainerCommand.OUTPUT_THREAD_DUMP);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGracefulShutdownSignal() throws IOException,
|
||||||
|
InterruptedException, YarnException {
|
||||||
|
testContainerLaunchAndSignal(SignalContainerCommand.GRACEFUL_SHUTDOWN);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testForcefulShutdownSignal() throws IOException,
|
||||||
|
InterruptedException, YarnException {
|
||||||
|
testContainerLaunchAndSignal(SignalContainerCommand.FORCEFUL_SHUTDOWN);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify signal container request can be delivered from
|
||||||
|
// NodeStatusUpdaterImpl to ContainerExecutor.
|
||||||
|
private void testContainerLaunchAndSignal(SignalContainerCommand command)
|
||||||
|
throws IOException, InterruptedException, YarnException {
|
||||||
|
|
||||||
|
Signal signal = ContainerLaunch.translateCommandToSignal(command);
|
||||||
|
containerManager.start();
|
||||||
|
|
||||||
|
File scriptFile = new File(tmpDir, "scriptFile.sh");
|
||||||
|
PrintWriter fileWriter = new PrintWriter(scriptFile);
|
||||||
|
File processStartFile =
|
||||||
|
new File(tmpDir, "start_file.txt").getAbsoluteFile();
|
||||||
|
fileWriter.write("\numask 0"); // So that start file is readable by the test
|
||||||
|
fileWriter.write("\necho Hello World! > " + processStartFile);
|
||||||
|
fileWriter.write("\necho $$ >> " + processStartFile);
|
||||||
|
fileWriter.write("\nexec sleep 1000s");
|
||||||
|
fileWriter.close();
|
||||||
|
|
||||||
|
ContainerLaunchContext containerLaunchContext =
|
||||||
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||||
|
|
||||||
|
// ////// Construct the Container-id
|
||||||
|
ContainerId cId = createContainerId(0);
|
||||||
|
|
||||||
|
URL resource_alpha =
|
||||||
|
ConverterUtils.getYarnUrlFromPath(localFS
|
||||||
|
.makeQualified(new Path(scriptFile.getAbsolutePath())));
|
||||||
|
LocalResource rsrc_alpha =
|
||||||
|
recordFactory.newRecordInstance(LocalResource.class);
|
||||||
|
rsrc_alpha.setResource(resource_alpha);
|
||||||
|
rsrc_alpha.setSize(-1);
|
||||||
|
rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||||
|
rsrc_alpha.setType(LocalResourceType.FILE);
|
||||||
|
rsrc_alpha.setTimestamp(scriptFile.lastModified());
|
||||||
|
String destinationFile = "dest_file";
|
||||||
|
Map<String, LocalResource> localResources =
|
||||||
|
new HashMap<String, LocalResource>();
|
||||||
|
localResources.put(destinationFile, rsrc_alpha);
|
||||||
|
containerLaunchContext.setLocalResources(localResources);
|
||||||
|
List<String> commands = new ArrayList<String>();
|
||||||
|
commands.add("/bin/bash");
|
||||||
|
commands.add(scriptFile.getAbsolutePath());
|
||||||
|
containerLaunchContext.setCommands(commands);
|
||||||
|
StartContainerRequest scRequest =
|
||||||
|
StartContainerRequest.newInstance(
|
||||||
|
containerLaunchContext,
|
||||||
|
createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
|
||||||
|
user, context.getContainerTokenSecretManager()));
|
||||||
|
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
|
||||||
|
list.add(scRequest);
|
||||||
|
StartContainersRequest allRequests =
|
||||||
|
StartContainersRequest.newInstance(list);
|
||||||
|
containerManager.startContainers(allRequests);
|
||||||
|
|
||||||
|
int timeoutSecs = 0;
|
||||||
|
while (!processStartFile.exists() && timeoutSecs++ < 20) {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
LOG.info("Waiting for process start-file to be created");
|
||||||
|
}
|
||||||
|
Assert.assertTrue("ProcessStartFile doesn't exist!",
|
||||||
|
processStartFile.exists());
|
||||||
|
|
||||||
|
// Simulate NodeStatusUpdaterImpl sending CMgrSignalContainersEvent
|
||||||
|
SignalContainerRequest signalReq =
|
||||||
|
SignalContainerRequest.newInstance(cId, command);
|
||||||
|
List<SignalContainerRequest> reqs = new ArrayList<SignalContainerRequest>();
|
||||||
|
reqs.add(signalReq);
|
||||||
|
containerManager.handle(new CMgrSignalContainersEvent(reqs));
|
||||||
|
|
||||||
|
final ArgumentCaptor<ContainerSignalContext> signalContextCaptor =
|
||||||
|
ArgumentCaptor.forClass(ContainerSignalContext.class);
|
||||||
|
if (signal.equals(Signal.NULL)) {
|
||||||
|
verify(exec, never()).signalContainer(signalContextCaptor.capture());
|
||||||
|
} else {
|
||||||
|
verify(exec, timeout(10000).atLeastOnce()).signalContainer(signalContextCaptor.capture());
|
||||||
|
ContainerSignalContext signalContext = signalContextCaptor.getAllValues().get(0);
|
||||||
|
Assert.assertEquals(cId, signalContext.getContainer().getContainerId());
|
||||||
|
Assert.assertEquals(signal, signalContext.getSignal());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,10 +93,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||||
|
@ -139,6 +141,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeSignalContainerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
|
@ -1391,4 +1394,66 @@ public class ClientRMService extends AbstractService implements
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Signal a container.
|
||||||
|
* After the request passes some sanity check, it will be delivered
|
||||||
|
* to RMNodeImpl so that the next NM heartbeat will pick up the signal request
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public SignalContainerResponse signalContainer(
|
||||||
|
SignalContainerRequest request) throws YarnException, IOException {
|
||||||
|
ContainerId containerId = request.getContainerId();
|
||||||
|
|
||||||
|
UserGroupInformation callerUGI;
|
||||||
|
try {
|
||||||
|
callerUGI = UserGroupInformation.getCurrentUser();
|
||||||
|
} catch (IOException ie) {
|
||||||
|
LOG.info("Error getting UGI ", ie);
|
||||||
|
throw RPCUtil.getRemoteException(ie);
|
||||||
|
}
|
||||||
|
|
||||||
|
ApplicationId applicationId = containerId.getApplicationAttemptId().
|
||||||
|
getApplicationId();
|
||||||
|
RMApp application = this.rmContext.getRMApps().get(applicationId);
|
||||||
|
if (application == null) {
|
||||||
|
RMAuditLogger.logFailure(callerUGI.getUserName(),
|
||||||
|
AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService",
|
||||||
|
"Trying to signal an absent container", applicationId, containerId);
|
||||||
|
throw RPCUtil
|
||||||
|
.getRemoteException("Trying to signal an absent container "
|
||||||
|
+ containerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!checkAccess(callerUGI, application.getUser(),
|
||||||
|
ApplicationAccessType.MODIFY_APP, application)) {
|
||||||
|
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
|
||||||
|
AuditConstants.SIGNAL_CONTAINER, "User doesn't have permissions to "
|
||||||
|
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
|
||||||
|
AuditConstants.UNAUTHORIZED_USER, applicationId);
|
||||||
|
throw RPCUtil.getRemoteException(new AccessControlException("User "
|
||||||
|
+ callerUGI.getShortUserName() + " cannot perform operation "
|
||||||
|
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
|
||||||
|
}
|
||||||
|
|
||||||
|
RMContainer container = scheduler.getRMContainer(containerId);
|
||||||
|
if (container != null) {
|
||||||
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
|
new RMNodeSignalContainerEvent(container.getContainer().getNodeId(),
|
||||||
|
request));
|
||||||
|
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
||||||
|
AuditConstants.SIGNAL_CONTAINER, "ClientRMService", applicationId,
|
||||||
|
containerId);
|
||||||
|
} else {
|
||||||
|
RMAuditLogger.logFailure(callerUGI.getUserName(),
|
||||||
|
AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService",
|
||||||
|
"Trying to signal an absent container", applicationId, containerId);
|
||||||
|
throw RPCUtil
|
||||||
|
.getRemoteException("Trying to signal an absent container "
|
||||||
|
+ containerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
return recordFactory
|
||||||
|
.newRecordInstance(SignalContainerResponse.class);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,6 +58,7 @@ public class RMAuditLogger {
|
||||||
"Update Application Priority Request";
|
"Update Application Priority Request";
|
||||||
public static final String CHANGE_CONTAINER_RESOURCE =
|
public static final String CHANGE_CONTAINER_RESOURCE =
|
||||||
"AM Changed Container Resource";
|
"AM Changed Container Resource";
|
||||||
|
public static final String SIGNAL_CONTAINER = "Signal Container Request";
|
||||||
|
|
||||||
// Some commonly used descriptions
|
// Some commonly used descriptions
|
||||||
public static final String UNAUTHORIZED_USER = "Unauthorized user";
|
public static final String UNAUTHORIZED_USER = "Unauthorized user";
|
||||||
|
|
|
@ -44,6 +44,9 @@ public enum RMNodeEventType {
|
||||||
CLEANUP_CONTAINER,
|
CLEANUP_CONTAINER,
|
||||||
DECREASE_CONTAINER,
|
DECREASE_CONTAINER,
|
||||||
|
|
||||||
|
// Source: ClientRMService
|
||||||
|
SIGNAL_CONTAINER,
|
||||||
|
|
||||||
// Source: RMAppAttempt
|
// Source: RMAppAttempt
|
||||||
FINISHED_CONTAINERS_PULLED_BY_AM,
|
FINISHED_CONTAINERS_PULLED_BY_AM,
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
@ -122,6 +123,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
|
private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
|
||||||
new ContainerIdComparator());
|
new ContainerIdComparator());
|
||||||
|
|
||||||
|
/* set of containers that need to be signaled */
|
||||||
|
private final List<SignalContainerRequest> containersToSignal =
|
||||||
|
new ArrayList<SignalContainerRequest>();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* set of containers to notify NM to remove them from its context. Currently,
|
* set of containers to notify NM to remove them from its context. Currently,
|
||||||
* this includes containers that were notified to AM about their completion
|
* this includes containers that were notified to AM about their completion
|
||||||
|
@ -194,6 +199,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
|
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
|
||||||
RMNodeEventType.DECREASE_CONTAINER,
|
RMNodeEventType.DECREASE_CONTAINER,
|
||||||
new DecreaseContainersTransition())
|
new DecreaseContainersTransition())
|
||||||
|
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
|
||||||
|
RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition())
|
||||||
.addTransition(NodeState.RUNNING, NodeState.SHUTDOWN,
|
.addTransition(NodeState.RUNNING, NodeState.SHUTDOWN,
|
||||||
RMNodeEventType.SHUTDOWN,
|
RMNodeEventType.SHUTDOWN,
|
||||||
new DeactivateNodeTransition(NodeState.SHUTDOWN))
|
new DeactivateNodeTransition(NodeState.SHUTDOWN))
|
||||||
|
@ -288,6 +295,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
|
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
|
||||||
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
|
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
|
||||||
new AddContainersToBeRemovedFromNMTransition())
|
new AddContainersToBeRemovedFromNMTransition())
|
||||||
|
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
|
||||||
|
RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition())
|
||||||
.addTransition(NodeState.UNHEALTHY, NodeState.SHUTDOWN,
|
.addTransition(NodeState.UNHEALTHY, NodeState.SHUTDOWN,
|
||||||
RMNodeEventType.SHUTDOWN,
|
RMNodeEventType.SHUTDOWN,
|
||||||
new DeactivateNodeTransition(NodeState.SHUTDOWN))
|
new DeactivateNodeTransition(NodeState.SHUTDOWN))
|
||||||
|
@ -491,8 +500,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
response.addAllApplicationsToCleanup(this.finishedApplications);
|
response.addAllApplicationsToCleanup(this.finishedApplications);
|
||||||
response.addContainersToBeRemovedFromNM(
|
response.addContainersToBeRemovedFromNM(
|
||||||
new ArrayList<ContainerId>(this.containersToBeRemovedFromNM));
|
new ArrayList<ContainerId>(this.containersToBeRemovedFromNM));
|
||||||
|
response.addAllContainersToSignal(this.containersToSignal);
|
||||||
this.containersToClean.clear();
|
this.containersToClean.clear();
|
||||||
this.finishedApplications.clear();
|
this.finishedApplications.clear();
|
||||||
|
this.containersToSignal.clear();
|
||||||
this.containersToBeRemovedFromNM.clear();
|
this.containersToBeRemovedFromNM.clear();
|
||||||
} finally {
|
} finally {
|
||||||
this.writeLock.unlock();
|
this.writeLock.unlock();
|
||||||
|
@ -1090,6 +1101,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class SignalContainerTransition implements
|
||||||
|
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||||
|
rmNode.containersToSignal.add(((
|
||||||
|
RMNodeSignalContainerEvent) event).getSignalRequest());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<UpdatedContainerInfo> pullContainerUpdates() {
|
public List<UpdatedContainerInfo> pullContainerUpdates() {
|
||||||
List<UpdatedContainerInfo> latestContainerInfoList =
|
List<UpdatedContainerInfo> latestContainerInfoList =
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
|
|
||||||
|
public class RMNodeSignalContainerEvent extends RMNodeEvent {
|
||||||
|
|
||||||
|
private SignalContainerRequest signalRequest;
|
||||||
|
|
||||||
|
public RMNodeSignalContainerEvent(NodeId nodeId,
|
||||||
|
SignalContainerRequest signalRequest) {
|
||||||
|
super(nodeId, RMNodeEventType.SIGNAL_CONTAINER);
|
||||||
|
this.signalRequest = signalRequest;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SignalContainerRequest getSignalRequest() {
|
||||||
|
return this.signalRequest;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
|
@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
@ -814,4 +816,12 @@ public class MockRM extends ResourceManager {
|
||||||
public RMActiveServices getRMActiveService() {
|
public RMActiveServices getRMActiveService() {
|
||||||
return activeServices;
|
return activeServices;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void signalContainer(ContainerId containerId, SignalContainerCommand command)
|
||||||
|
throws Exception {
|
||||||
|
ApplicationClientProtocol client = getClientRMService();
|
||||||
|
SignalContainerRequest req =
|
||||||
|
SignalContainerRequest.newInstance(containerId, command);
|
||||||
|
client.signalContainer(req);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,113 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestSignalContainer {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(TestSignalContainer.class);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSignalRequestDeliveryToNM() throws Exception {
|
||||||
|
Logger rootLogger = LogManager.getRootLogger();
|
||||||
|
rootLogger.setLevel(Level.DEBUG);
|
||||||
|
MockRM rm = new MockRM();
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
MockNM nm1 = rm.registerNode("h1:1234", 5000);
|
||||||
|
|
||||||
|
RMApp app = rm.submitApp(2000);
|
||||||
|
|
||||||
|
//kick the scheduling
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
|
||||||
|
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||||
|
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||||
|
am.registerAppAttempt();
|
||||||
|
|
||||||
|
//request for containers
|
||||||
|
final int request = 2;
|
||||||
|
am.allocate("h1" , 1000, request, new ArrayList<ContainerId>());
|
||||||
|
|
||||||
|
//kick the scheduler
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
List<Container> conts = null;
|
||||||
|
int contReceived = 0;
|
||||||
|
int waitCount = 0;
|
||||||
|
while (contReceived < request && waitCount++ < 200) {
|
||||||
|
LOG.info("Got " + contReceived + " containers. Waiting to get "
|
||||||
|
+ request);
|
||||||
|
Thread.sleep(100);
|
||||||
|
conts = am.allocate(new ArrayList<ResourceRequest>(),
|
||||||
|
new ArrayList<ContainerId>()).getAllocatedContainers();
|
||||||
|
contReceived += conts.size();
|
||||||
|
}
|
||||||
|
Assert.assertEquals(request, contReceived);
|
||||||
|
|
||||||
|
for(Container container : conts) {
|
||||||
|
rm.signalContainer(container.getId(),
|
||||||
|
SignalContainerCommand.OUTPUT_THREAD_DUMP);
|
||||||
|
}
|
||||||
|
|
||||||
|
NodeHeartbeatResponse resp;
|
||||||
|
List<SignalContainerRequest> contsToSignal;
|
||||||
|
int signaledConts = 0;
|
||||||
|
|
||||||
|
waitCount = 0;
|
||||||
|
while ( signaledConts < request && waitCount++ < 200) {
|
||||||
|
LOG.info("Waiting to get signalcontainer events.. signaledConts: "
|
||||||
|
+ signaledConts);
|
||||||
|
resp = nm1.nodeHeartbeat(true);
|
||||||
|
contsToSignal = resp.getContainersToSignalList();
|
||||||
|
signaledConts += contsToSignal.size();
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify NM receives the expected number of signal container requests.
|
||||||
|
Assert.assertEquals(request, signaledConts);
|
||||||
|
|
||||||
|
am.unregisterAppAttempt();
|
||||||
|
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
|
am.waitForState(RMAppAttemptState.FINISHED);
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue