YARN-5611. Provide an API to update lifetime of an application. Contributed by Rohith Sharma K S
This commit is contained in:
parent
8a2998c08c
commit
a422740bd7
|
@ -124,6 +124,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
|
@ -485,6 +487,13 @@ public class TestClientRedirect {
|
|||
SignalContainerRequest request) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
|
||||
UpdateApplicationTimeoutsRequest request)
|
||||
throws YarnException, IOException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
class HistoryService extends AMService implements HSClientProtocol {
|
||||
|
|
|
@ -59,6 +59,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
|
@ -566,4 +568,25 @@ public interface ApplicationClientProtocol extends ApplicationBaseProtocol {
|
|||
SignalContainerResponse signalToContainer(
|
||||
SignalContainerRequest request) throws YarnException,
|
||||
IOException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* The interface used by client to set ApplicationTimeouts of an application.
|
||||
* The UpdateApplicationTimeoutsRequest should have timeout value with
|
||||
* absolute time with ISO8601 format <b>yyyy-MM-dd'T'HH:mm:ss.SSSZ</b>.
|
||||
* </p>
|
||||
* <b>Note:</b> If application timeout value is less than or equal to current
|
||||
* time then update application throws YarnException.
|
||||
* @param request to set ApplicationTimeouts of an application
|
||||
* @return an empty response that the update has completed successfully.
|
||||
* @throws YarnException if update request has empty values or application is
|
||||
* in completing states.
|
||||
* @throws IOException on IO failures
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
@Idempotent
|
||||
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
|
||||
UpdateApplicationTimeoutsRequest request)
|
||||
throws YarnException, IOException;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.protocolrecords;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* The request sent by the client to the <code>ResourceManager</code> to set or
|
||||
* update the application timeout.
|
||||
* </p>
|
||||
* <p>
|
||||
* The request includes the {@link ApplicationId} of the application and timeout
|
||||
* to be set for an application
|
||||
* </p>
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract class UpdateApplicationTimeoutsRequest {
|
||||
public static UpdateApplicationTimeoutsRequest newInstance(
|
||||
ApplicationId applicationId,
|
||||
Map<ApplicationTimeoutType, String> applicationTimeouts) {
|
||||
UpdateApplicationTimeoutsRequest request =
|
||||
Records.newRecord(UpdateApplicationTimeoutsRequest.class);
|
||||
request.setApplicationId(applicationId);
|
||||
request.setApplicationTimeouts(applicationTimeouts);
|
||||
return request;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the <code>ApplicationId</code> of the application.
|
||||
* @return <code>ApplicationId</code> of the application
|
||||
*/
|
||||
public abstract ApplicationId getApplicationId();
|
||||
|
||||
/**
|
||||
* Set the <code>ApplicationId</code> of the application.
|
||||
* @param applicationId <code>ApplicationId</code> of the application
|
||||
*/
|
||||
public abstract void setApplicationId(ApplicationId applicationId);
|
||||
|
||||
/**
|
||||
* Get <code>ApplicationTimeouts</code> of the application. Timeout value is
|
||||
* in ISO8601 standard with format <b>yyyy-MM-dd'T'HH:mm:ss.SSSZ</b>.
|
||||
* @return all <code>ApplicationTimeouts</code> of the application.
|
||||
*/
|
||||
public abstract Map<ApplicationTimeoutType, String> getApplicationTimeouts();
|
||||
|
||||
/**
|
||||
* Set the <code>ApplicationTimeouts</code> for the application. Timeout value
|
||||
* is absolute. Timeout value should meet ISO8601 format. Support ISO8601
|
||||
* format is <b>yyyy-MM-dd'T'HH:mm:ss.SSSZ</b>. All pre-existing Map entries
|
||||
* are cleared before adding the new Map.
|
||||
* @param applicationTimeouts <code>ApplicationTimeouts</code>s for the
|
||||
* application
|
||||
*/
|
||||
public abstract void setApplicationTimeouts(
|
||||
Map<ApplicationTimeoutType, String> applicationTimeouts);
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.protocolrecords;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* The response sent by the <code>ResourceManager</code> to the client on update
|
||||
* application timeout.
|
||||
* </p>
|
||||
* <p>
|
||||
* A response without exception means that the update has completed
|
||||
* successfully.
|
||||
* </p>
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract class UpdateApplicationTimeoutsResponse {
|
||||
|
||||
public static UpdateApplicationTimeoutsResponse newInstance() {
|
||||
UpdateApplicationTimeoutsResponse response =
|
||||
Records.newRecord(UpdateApplicationTimeoutsResponse.class);
|
||||
return response;
|
||||
}
|
||||
}
|
|
@ -549,6 +549,10 @@ public abstract class ApplicationSubmissionContext {
|
|||
/**
|
||||
* Set the <code>ApplicationTimeouts</code> for the application in seconds.
|
||||
* All pre-existing Map entries are cleared before adding the new Map.
|
||||
* <p>
|
||||
* <b>Note:</b> If application timeout value is less than or equal to zero
|
||||
* then application submission will throw an exception.
|
||||
* </p>
|
||||
* @param applicationTimeouts <code>ApplicationTimeouts</code>s for the
|
||||
* application
|
||||
*/
|
||||
|
|
|
@ -1496,10 +1496,10 @@ public class YarnConfiguration extends Configuration {
|
|||
|
||||
|
||||
// Configurations for applicaiton life time monitor feature
|
||||
public static final String RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS =
|
||||
RM_PREFIX + "application-timeouts.lifetime-monitor.interval-ms";
|
||||
public static final String RM_APPLICATION_MONITOR_INTERVAL_MS =
|
||||
RM_PREFIX + "application-timeouts.monitor.interval-ms";
|
||||
|
||||
public static final long DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS =
|
||||
public static final long DEFAULT_RM_APPLICATION_MONITOR_INTERVAL_MS =
|
||||
60000;
|
||||
|
||||
/**
|
||||
|
|
|
@ -60,4 +60,5 @@ service ApplicationClientProtocolService {
|
|||
rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto);
|
||||
rpc updateApplicationPriority (UpdateApplicationPriorityRequestProto) returns (UpdateApplicationPriorityResponseProto);
|
||||
rpc signalToContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto);
|
||||
rpc updateApplicationTimeouts (UpdateApplicationTimeoutsRequestProto) returns (UpdateApplicationTimeoutsResponseProto);
|
||||
}
|
||||
|
|
|
@ -382,6 +382,11 @@ message ApplicationTimeoutMapProto {
|
|||
optional int64 timeout = 2;
|
||||
}
|
||||
|
||||
message ApplicationUpdateTimeoutMapProto {
|
||||
optional ApplicationTimeoutTypeProto application_timeout_type = 1;
|
||||
optional string expire_time = 2;
|
||||
}
|
||||
|
||||
message LogAggregationContextProto {
|
||||
optional string include_pattern = 1 [default = ".*"];
|
||||
optional string exclude_pattern = 2 [default = ""];
|
||||
|
|
|
@ -266,6 +266,15 @@ message SignalContainerRequestProto {
|
|||
message SignalContainerResponseProto {
|
||||
}
|
||||
|
||||
message UpdateApplicationTimeoutsRequestProto {
|
||||
required ApplicationIdProto applicationId = 1;
|
||||
repeated ApplicationUpdateTimeoutMapProto application_timeouts = 2;
|
||||
}
|
||||
|
||||
message UpdateApplicationTimeoutsResponseProto {
|
||||
repeated ApplicationUpdateTimeoutMapProto application_timeouts = 1;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////
|
||||
/////// client_NM_Protocol ///////////////////////////
|
||||
//////////////////////////////////////////////////////
|
||||
|
|
|
@ -83,6 +83,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
|
@ -139,6 +141,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateReque
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
|
||||
|
@ -165,7 +169,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationDeleteRequestPr
|
|||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
@ -600,4 +604,19 @@ public class ApplicationClientProtocolPBClientImpl implements ApplicationClientP
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
|
||||
UpdateApplicationTimeoutsRequest request)
|
||||
throws YarnException, IOException {
|
||||
UpdateApplicationTimeoutsRequestProto requestProto =
|
||||
((UpdateApplicationTimeoutsRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new UpdateApplicationTimeoutsResponsePBImpl(
|
||||
proxy.updateApplicationTimeouts(null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
|
||||
|
@ -111,6 +112,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequest
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
@ -162,6 +165,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationListResponsePro
|
|||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
|
||||
|
||||
|
@ -609,4 +614,21 @@ public class ApplicationClientProtocolPBServiceImpl implements ApplicationClient
|
|||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateApplicationTimeoutsResponseProto updateApplicationTimeouts(
|
||||
RpcController controller, UpdateApplicationTimeoutsRequestProto proto)
|
||||
throws ServiceException {
|
||||
UpdateApplicationTimeoutsRequestPBImpl request =
|
||||
new UpdateApplicationTimeoutsRequestPBImpl(proto);
|
||||
try {
|
||||
UpdateApplicationTimeoutsResponse response =
|
||||
real.updateApplicationTimeouts(request);
|
||||
return ((UpdateApplicationTimeoutsResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,220 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationUpdateTimeoutMapProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProtoOrBuilder;
|
||||
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class UpdateApplicationTimeoutsRequestPBImpl
|
||||
extends UpdateApplicationTimeoutsRequest {
|
||||
|
||||
UpdateApplicationTimeoutsRequestProto proto =
|
||||
UpdateApplicationTimeoutsRequestProto.getDefaultInstance();
|
||||
UpdateApplicationTimeoutsRequestProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
private ApplicationId applicationId = null;
|
||||
private Map<ApplicationTimeoutType, String> applicationTimeouts = null;
|
||||
|
||||
public UpdateApplicationTimeoutsRequestPBImpl() {
|
||||
builder = UpdateApplicationTimeoutsRequestProto.newBuilder();
|
||||
}
|
||||
|
||||
public UpdateApplicationTimeoutsRequestPBImpl(
|
||||
UpdateApplicationTimeoutsRequestProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public UpdateApplicationTimeoutsRequestProto getProto() {
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
if (viaProto)
|
||||
maybeInitBuilder();
|
||||
mergeLocalToBuilder();
|
||||
proto = builder.build();
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
private void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = UpdateApplicationTimeoutsRequestProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
private void mergeLocalToBuilder() {
|
||||
if (this.applicationId != null) {
|
||||
builder.setApplicationId(convertToProtoFormat(this.applicationId));
|
||||
}
|
||||
if (this.applicationTimeouts != null) {
|
||||
addApplicationTimeouts();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationId getApplicationId() {
|
||||
UpdateApplicationTimeoutsRequestProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
if (this.applicationId != null) {
|
||||
return applicationId;
|
||||
} // Else via proto
|
||||
if (!p.hasApplicationId()) {
|
||||
return null;
|
||||
}
|
||||
applicationId = convertFromProtoFormat(p.getApplicationId());
|
||||
return applicationId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationId(ApplicationId applicationId) {
|
||||
maybeInitBuilder();
|
||||
if (applicationId == null) {
|
||||
builder.clearApplicationId();
|
||||
}
|
||||
this.applicationId = applicationId;
|
||||
}
|
||||
|
||||
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
|
||||
return new ApplicationIdPBImpl(p);
|
||||
}
|
||||
|
||||
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
|
||||
return ((ApplicationIdPBImpl) t).getProto();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getProto().hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null) {
|
||||
return false;
|
||||
}
|
||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ApplicationTimeoutType, String> getApplicationTimeouts() {
|
||||
initApplicationTimeout();
|
||||
return this.applicationTimeouts;
|
||||
}
|
||||
|
||||
private void initApplicationTimeout() {
|
||||
if (this.applicationTimeouts != null) {
|
||||
return;
|
||||
}
|
||||
UpdateApplicationTimeoutsRequestProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
List<ApplicationUpdateTimeoutMapProto> lists =
|
||||
p.getApplicationTimeoutsList();
|
||||
this.applicationTimeouts =
|
||||
new HashMap<ApplicationTimeoutType, String>(lists.size());
|
||||
for (ApplicationUpdateTimeoutMapProto timeoutProto : lists) {
|
||||
this.applicationTimeouts.put(
|
||||
ProtoUtils
|
||||
.convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()),
|
||||
timeoutProto.getExpireTime());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationTimeouts(
|
||||
Map<ApplicationTimeoutType, String> appTimeouts) {
|
||||
if (appTimeouts == null) {
|
||||
return;
|
||||
}
|
||||
initApplicationTimeout();
|
||||
this.applicationTimeouts.clear();
|
||||
this.applicationTimeouts.putAll(appTimeouts);
|
||||
}
|
||||
|
||||
private void addApplicationTimeouts() {
|
||||
maybeInitBuilder();
|
||||
builder.clearApplicationTimeouts();
|
||||
if (applicationTimeouts == null) {
|
||||
return;
|
||||
}
|
||||
Iterable<? extends ApplicationUpdateTimeoutMapProto> values =
|
||||
new Iterable<ApplicationUpdateTimeoutMapProto>() {
|
||||
|
||||
@Override
|
||||
public Iterator<ApplicationUpdateTimeoutMapProto> iterator() {
|
||||
return new Iterator<ApplicationUpdateTimeoutMapProto>() {
|
||||
private Iterator<ApplicationTimeoutType> iterator =
|
||||
applicationTimeouts.keySet().iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return iterator.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationUpdateTimeoutMapProto next() {
|
||||
ApplicationTimeoutType key = iterator.next();
|
||||
return ApplicationUpdateTimeoutMapProto.newBuilder()
|
||||
.setExpireTime(applicationTimeouts.get(key))
|
||||
.setApplicationTimeoutType(
|
||||
ProtoUtils.convertToProtoFormat(key))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
this.builder.addAllApplicationTimeouts(values);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProto;
|
||||
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class UpdateApplicationTimeoutsResponsePBImpl
|
||||
extends UpdateApplicationTimeoutsResponse {
|
||||
UpdateApplicationTimeoutsResponseProto proto =
|
||||
UpdateApplicationTimeoutsResponseProto.getDefaultInstance();
|
||||
UpdateApplicationTimeoutsResponseProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
public UpdateApplicationTimeoutsResponsePBImpl() {
|
||||
builder = UpdateApplicationTimeoutsResponseProto.newBuilder();
|
||||
}
|
||||
|
||||
public UpdateApplicationTimeoutsResponsePBImpl(
|
||||
UpdateApplicationTimeoutsResponseProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public UpdateApplicationTimeoutsResponseProto getProto() {
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getProto().hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null) {
|
||||
return false;
|
||||
}
|
||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
}
|
|
@ -46,6 +46,7 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
|
|||
public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins
|
||||
private long expireInterval = DEFAULT_EXPIRE;
|
||||
private long monitorInterval = expireInterval / 3;
|
||||
private volatile boolean resetTimerOnStart = true;
|
||||
|
||||
private final Clock clock;
|
||||
|
||||
|
@ -105,8 +106,8 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
|
|||
register(ob, clock.getTime());
|
||||
}
|
||||
|
||||
public synchronized void register(O ob, long monitorStartTime) {
|
||||
running.put(ob, monitorStartTime);
|
||||
public synchronized void register(O ob, long expireTime) {
|
||||
running.put(ob, expireTime);
|
||||
}
|
||||
|
||||
public synchronized void unregister(O ob) {
|
||||
|
@ -114,12 +115,18 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
|
|||
}
|
||||
|
||||
public synchronized void resetTimer() {
|
||||
long time = clock.getTime();
|
||||
for (O ob : running.keySet()) {
|
||||
running.put(ob, time);
|
||||
if (resetTimerOnStart) {
|
||||
long time = clock.getTime();
|
||||
for (O ob : running.keySet()) {
|
||||
running.put(ob, time);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void setResetTimeOnStart(boolean resetTimeOnStart) {
|
||||
this.resetTimerOnStart = resetTimeOnStart;
|
||||
}
|
||||
|
||||
private class PingChecker implements Runnable {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.util;
|
||||
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
|
||||
|
@ -29,6 +30,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|||
public class Times {
|
||||
private static final Log LOG = LogFactory.getLog(Times.class);
|
||||
|
||||
static final String ISO8601DATEFORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
|
||||
|
||||
// This format should match the one used in yarn.dt.plugins.js
|
||||
static final ThreadLocal<SimpleDateFormat> dateFormat =
|
||||
new ThreadLocal<SimpleDateFormat>() {
|
||||
|
@ -37,6 +40,14 @@ public class Times {
|
|||
}
|
||||
};
|
||||
|
||||
static final ThreadLocal<SimpleDateFormat> isoFormat =
|
||||
new ThreadLocal<SimpleDateFormat>() {
|
||||
@Override
|
||||
protected SimpleDateFormat initialValue() {
|
||||
return new SimpleDateFormat(ISO8601DATEFORMAT);
|
||||
}
|
||||
};
|
||||
|
||||
public static long elapsed(long started, long finished) {
|
||||
return Times.elapsed(started, finished, true);
|
||||
}
|
||||
|
@ -74,4 +85,26 @@ public class Times {
|
|||
return ts > 0 ? String.valueOf(dateFormat.get().format(new Date(ts)))
|
||||
: "N/A";
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a time stamp returns ISO-8601 formated string in format
|
||||
* "yyyy-MM-dd'T'HH:mm:ss.SSSZ".
|
||||
* @param ts to be formatted in ISO format.
|
||||
* @return ISO 8601 formatted string.
|
||||
*/
|
||||
public static String formatISO8601(long ts) {
|
||||
return isoFormat.get().format(new Date(ts));
|
||||
}
|
||||
|
||||
/**
|
||||
* Given ISO formatted string with format "yyyy-MM-dd'T'HH:mm:ss.SSSZ", return
|
||||
* epoch time for local Time zone.
|
||||
* @param isoString in format of "yyyy-MM-dd'T'HH:mm:ss.SSSZ".
|
||||
* @return epoch time for local time zone.
|
||||
* @throws ParseException if given ISO formatted string can not be parsed.
|
||||
*/
|
||||
public static long parseISO8601ToLocalTimeInMillis(String isoString)
|
||||
throws ParseException {
|
||||
return isoFormat.get().parse(isoString).getTime();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2990,9 +2990,9 @@
|
|||
|
||||
<property>
|
||||
<description>
|
||||
The RMAppLifetimeMonitor Service uses this value as lifetime monitor interval
|
||||
The RMAppLifetimeMonitor Service uses this value as monitor interval
|
||||
</description>
|
||||
<name>yarn.resourcemanager.application-timeouts.lifetime-monitor.interval-ms</name>
|
||||
<name>yarn.resourcemanager.application-timeouts.monitor.interval-ms</name>
|
||||
<value>60000</value>
|
||||
</property>
|
||||
|
||||
|
|
|
@ -91,6 +91,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
|
@ -496,4 +498,11 @@ return null;
|
|||
FailApplicationAttemptRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
|
||||
UpdateApplicationTimeoutsRequest request)
|
||||
throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -108,12 +108,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
|
@ -1571,37 +1574,11 @@ public class ClientRMService extends AbstractService implements
|
|||
|
||||
ApplicationId applicationId = request.getApplicationId();
|
||||
Priority newAppPriority = request.getApplicationPriority();
|
||||
UserGroupInformation callerUGI;
|
||||
try {
|
||||
callerUGI = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException ie) {
|
||||
LOG.info("Error getting UGI ", ie);
|
||||
RMAuditLogger.logFailure("UNKNOWN", AuditConstants.UPDATE_APP_PRIORITY,
|
||||
"UNKNOWN", "ClientRMService", "Error getting UGI", applicationId);
|
||||
throw RPCUtil.getRemoteException(ie);
|
||||
}
|
||||
|
||||
RMApp application = this.rmContext.getRMApps().get(applicationId);
|
||||
if (application == null) {
|
||||
RMAuditLogger.logFailure(callerUGI.getUserName(),
|
||||
AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService",
|
||||
"Trying to update priority of an absent application", applicationId);
|
||||
throw new ApplicationNotFoundException(
|
||||
"Trying to update priority of an absent application "
|
||||
+ applicationId);
|
||||
}
|
||||
|
||||
if (!checkAccess(callerUGI, application.getUser(),
|
||||
ApplicationAccessType.MODIFY_APP, application)) {
|
||||
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
|
||||
AuditConstants.UPDATE_APP_PRIORITY,
|
||||
"User doesn't have permissions to "
|
||||
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
|
||||
AuditConstants.UNAUTHORIZED_USER, applicationId);
|
||||
throw RPCUtil.getRemoteException(new AccessControlException("User "
|
||||
+ callerUGI.getShortUserName() + " cannot perform operation "
|
||||
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
|
||||
}
|
||||
UserGroupInformation callerUGI =
|
||||
getCallerUgi(applicationId, AuditConstants.UPDATE_APP_PRIORITY);
|
||||
RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI,
|
||||
AuditConstants.UPDATE_APP_PRIORITY);
|
||||
|
||||
UpdateApplicationPriorityResponse response = recordFactory
|
||||
.newRecordInstance(UpdateApplicationPriorityResponse.class);
|
||||
|
@ -1706,4 +1683,104 @@ public class ClientRMService extends AbstractService implements
|
|||
.newRecordInstance(SignalContainerResponse.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
|
||||
UpdateApplicationTimeoutsRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
ApplicationId applicationId = request.getApplicationId();
|
||||
Map<ApplicationTimeoutType, String> applicationTimeouts =
|
||||
request.getApplicationTimeouts();
|
||||
|
||||
UserGroupInformation callerUGI =
|
||||
getCallerUgi(applicationId, AuditConstants.UPDATE_APP_TIMEOUTS);
|
||||
RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI,
|
||||
AuditConstants.UPDATE_APP_TIMEOUTS);
|
||||
|
||||
if (applicationTimeouts.isEmpty()) {
|
||||
String message =
|
||||
"At least one ApplicationTimeoutType should be configured"
|
||||
+ " for updating timeouts.";
|
||||
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
|
||||
AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService",
|
||||
message, applicationId);
|
||||
throw RPCUtil.getRemoteException(message);
|
||||
}
|
||||
|
||||
UpdateApplicationTimeoutsResponse response = recordFactory
|
||||
.newRecordInstance(UpdateApplicationTimeoutsResponse.class);
|
||||
|
||||
RMAppState state = application.getState();
|
||||
if (!EnumSet
|
||||
.of(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppState.RUNNING)
|
||||
.contains(state)) {
|
||||
if (COMPLETED_APP_STATES.contains(state)) {
|
||||
// If Application is in any of the final states, update timeout
|
||||
// can be skipped rather throwing exception.
|
||||
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
||||
AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService",
|
||||
applicationId);
|
||||
return response;
|
||||
}
|
||||
String msg =
|
||||
"Application is in " + state + " state can not update timeout.";
|
||||
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
|
||||
AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService",
|
||||
msg);
|
||||
throw RPCUtil.getRemoteException(msg);
|
||||
}
|
||||
|
||||
try {
|
||||
rmAppManager.updateApplicationTimeout(application, applicationTimeouts);
|
||||
} catch (YarnException ex) {
|
||||
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
|
||||
AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService",
|
||||
ex.getMessage());
|
||||
throw RPCUtil.getRemoteException(ex);
|
||||
}
|
||||
|
||||
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
||||
AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService", applicationId);
|
||||
return response;
|
||||
}
|
||||
|
||||
private UserGroupInformation getCallerUgi(ApplicationId applicationId,
|
||||
String operation) throws YarnException {
|
||||
UserGroupInformation callerUGI;
|
||||
try {
|
||||
callerUGI = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException ie) {
|
||||
LOG.info("Error getting UGI ", ie);
|
||||
RMAuditLogger.logFailure("UNKNOWN", operation, "UNKNOWN",
|
||||
"ClientRMService", "Error getting UGI", applicationId);
|
||||
throw RPCUtil.getRemoteException(ie);
|
||||
}
|
||||
return callerUGI;
|
||||
}
|
||||
|
||||
private RMApp verifyUserAccessForRMApp(ApplicationId applicationId,
|
||||
UserGroupInformation callerUGI, String operation) throws YarnException {
|
||||
RMApp application = this.rmContext.getRMApps().get(applicationId);
|
||||
if (application == null) {
|
||||
RMAuditLogger.logFailure(callerUGI.getUserName(), operation, "UNKNOWN",
|
||||
"ClientRMService",
|
||||
"Trying to " + operation + " of an absent application",
|
||||
applicationId);
|
||||
throw new ApplicationNotFoundException("Trying to " + operation
|
||||
+ " of an absent application " + applicationId);
|
||||
}
|
||||
|
||||
if (!checkAccess(callerUGI, application.getUser(),
|
||||
ApplicationAccessType.MODIFY_APP, application)) {
|
||||
RMAuditLogger.logFailure(callerUGI.getShortUserName(), operation,
|
||||
"User doesn't have permissions to "
|
||||
+ ApplicationAccessType.MODIFY_APP.toString(),
|
||||
"ClientRMService", AuditConstants.UNAUTHORIZED_USER, applicationId);
|
||||
throw RPCUtil.getRemoteException(new AccessControlException("User "
|
||||
+ callerUGI.getShortUserName() + " cannot perform operation "
|
||||
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
|
||||
}
|
||||
return application;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.util.StringUtils;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
|
@ -66,6 +67,8 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
|||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
|
||||
/**
|
||||
* This class manages the list of applications for the resource manager.
|
||||
|
@ -501,4 +504,38 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
|||
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
|
||||
}
|
||||
}
|
||||
|
||||
// transaction method.
|
||||
public void updateApplicationTimeout(RMApp app,
|
||||
Map<ApplicationTimeoutType, String> newTimeoutInISO8601Format)
|
||||
throws YarnException {
|
||||
ApplicationId applicationId = app.getApplicationId();
|
||||
synchronized (applicationId) {
|
||||
Map<ApplicationTimeoutType, Long> newExpireTime = RMServerUtils
|
||||
.validateISO8601AndConvertToLocalTimeEpoch(newTimeoutInISO8601Format);
|
||||
|
||||
SettableFuture<Object> future = SettableFuture.create();
|
||||
|
||||
Map<ApplicationTimeoutType, Long> currentExpireTimeouts =
|
||||
app.getApplicationTimeouts();
|
||||
currentExpireTimeouts.putAll(newExpireTime);
|
||||
|
||||
ApplicationStateData appState =
|
||||
ApplicationStateData.newInstance(app.getSubmitTime(),
|
||||
app.getStartTime(), app.getApplicationSubmissionContext(),
|
||||
app.getUser(), app.getCallerContext());
|
||||
appState.setApplicationTimeouts(currentExpireTimeouts);
|
||||
|
||||
// update to state store. Though it synchronous call, update via future to
|
||||
// know any exception has been set. It is required because in non-HA mode,
|
||||
// state-store errors are skipped.
|
||||
this.rmContext.getStateStore()
|
||||
.updateApplicationStateSynchronously(appState, false, future);
|
||||
|
||||
Futures.get(future, YarnException.class);
|
||||
|
||||
// update in-memory
|
||||
((RMAppImpl) app).updateApplicationTimeout(newExpireTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,7 +63,9 @@ public class RMAuditLogger {
|
|||
public static final String ALLOC_CONTAINER = "AM Allocated Container";
|
||||
public static final String RELEASE_CONTAINER = "AM Released Container";
|
||||
public static final String UPDATE_APP_PRIORITY =
|
||||
"Update Application Priority Request";
|
||||
"Update Application Priority";
|
||||
public static final String UPDATE_APP_TIMEOUTS =
|
||||
"Update Application Timeouts";
|
||||
public static final String CHANGE_CONTAINER_RESOURCE =
|
||||
"AM Changed Container Resource";
|
||||
public static final String SIGNAL_CONTAINER = "Signal Container Request";
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.text.ParseException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
|
@ -69,6 +70,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
|
@ -89,6 +93,8 @@ public class RMServerUtils {
|
|||
protected static final RecordFactory RECORD_FACTORY =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
private static Clock clock = SystemClock.getInstance();
|
||||
|
||||
public static List<RMNode> queryRMNodes(RMContext context,
|
||||
EnumSet<NodeState> acceptedStates) {
|
||||
// nodes contains nodes that are NEW, RUNNING, UNHEALTHY or DECOMMISSIONING.
|
||||
|
@ -398,6 +404,7 @@ public class RMServerUtils {
|
|||
case FINISHING:
|
||||
case FINISHED:
|
||||
return YarnApplicationState.FINISHED;
|
||||
case KILLING:
|
||||
case KILLED:
|
||||
return YarnApplicationState.KILLED;
|
||||
case FAILED:
|
||||
|
@ -475,7 +482,7 @@ public class RMServerUtils {
|
|||
if (timeouts != null) {
|
||||
for (Map.Entry<ApplicationTimeoutType, Long> timeout : timeouts
|
||||
.entrySet()) {
|
||||
if (timeout.getValue() < 0) {
|
||||
if (timeout.getValue() <= 0) {
|
||||
String message = "Invalid application timeout, value="
|
||||
+ timeout.getValue() + " for type=" + timeout.getKey();
|
||||
throw new YarnException(message);
|
||||
|
@ -483,4 +490,43 @@ public class RMServerUtils {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate ISO8601 format with epoch time.
|
||||
* @param timeoutsInISO8601 format
|
||||
* @return expire time in local epoch
|
||||
* @throws YarnException if given application timeout value is lesser than
|
||||
* current time.
|
||||
*/
|
||||
public static Map<ApplicationTimeoutType, Long> validateISO8601AndConvertToLocalTimeEpoch(
|
||||
Map<ApplicationTimeoutType, String> timeoutsInISO8601)
|
||||
throws YarnException {
|
||||
long currentTimeMillis = clock.getTime();
|
||||
Map<ApplicationTimeoutType, Long> newApplicationTimeout =
|
||||
new HashMap<ApplicationTimeoutType, Long>();
|
||||
if (timeoutsInISO8601 != null) {
|
||||
for (Map.Entry<ApplicationTimeoutType, String> timeout : timeoutsInISO8601
|
||||
.entrySet()) {
|
||||
long expireTime = 0L;
|
||||
try {
|
||||
expireTime =
|
||||
Times.parseISO8601ToLocalTimeInMillis(timeout.getValue());
|
||||
} catch (ParseException ex) {
|
||||
String message =
|
||||
"Expire time is not in ISO8601 format. ISO8601 supported "
|
||||
+ "format is yyyy-MM-dd'T'HH:mm:ss.SSSZ";
|
||||
throw new YarnException(message);
|
||||
}
|
||||
if (expireTime < currentTimeMillis) {
|
||||
String message =
|
||||
"Expire time is less than current time, current-time="
|
||||
+ Times.formatISO8601(currentTimeMillis) + " expire-time="
|
||||
+ Times.formatISO8601(expireTime);
|
||||
throw new YarnException(message);
|
||||
}
|
||||
newApplicationTimeout.put(timeout.getKey(), expireTime);
|
||||
}
|
||||
}
|
||||
return newApplicationTimeout;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,13 +31,14 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
|||
import javax.crypto.SecretKey;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.CallerContext;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
|
@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.records.Version;
|
||||
|
@ -237,6 +239,8 @@ public abstract class RMStateStore extends AbstractService {
|
|||
boolean isFenced = false;
|
||||
ApplicationStateData appState =
|
||||
((RMStateUpdateAppEvent) event).getAppState();
|
||||
SettableFuture<Object> result =
|
||||
((RMStateUpdateAppEvent) event).getResult();
|
||||
ApplicationId appId =
|
||||
appState.getApplicationSubmissionContext().getApplicationId();
|
||||
LOG.info("Updating info for app: " + appId);
|
||||
|
@ -246,9 +250,18 @@ public abstract class RMStateStore extends AbstractService {
|
|||
store.notifyApplication(new RMAppEvent(appId,
|
||||
RMAppEventType.APP_UPDATE_SAVED));
|
||||
}
|
||||
|
||||
if (result != null) {
|
||||
result.set(null);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error updating app: " + appId, e);
|
||||
String msg = "Error updating app: " + appId;
|
||||
LOG.error(msg, e);
|
||||
isFenced = store.notifyStoreOperationFailedInternal(e);
|
||||
if (result != null) {
|
||||
result.setException(new YarnException(msg, e));
|
||||
}
|
||||
}
|
||||
return finalState(isFenced);
|
||||
};
|
||||
|
@ -774,18 +787,19 @@ public abstract class RMStateStore extends AbstractService {
|
|||
ApplicationStateData appState =
|
||||
ApplicationStateData.newInstance(app.getSubmitTime(),
|
||||
app.getStartTime(), context, app.getUser(), app.getCallerContext());
|
||||
appState.setApplicationTimeouts(app.getApplicationTimeouts());
|
||||
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void updateApplicationState(
|
||||
ApplicationStateData appState) {
|
||||
public void updateApplicationState(ApplicationStateData appState) {
|
||||
dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState));
|
||||
}
|
||||
|
||||
public void updateApplicationStateSynchronously(
|
||||
ApplicationStateData appState, boolean notifyApp) {
|
||||
handleStoreEvent(new RMStateUpdateAppEvent(appState, notifyApp));
|
||||
public void updateApplicationStateSynchronously(ApplicationStateData appState,
|
||||
boolean notifyApp, SettableFuture<Object> resultFuture) {
|
||||
handleStoreEvent(
|
||||
new RMStateUpdateAppEvent(appState, notifyApp, resultFuture));
|
||||
}
|
||||
|
||||
public void updateFencedState() {
|
||||
|
|
|
@ -20,21 +20,28 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
|||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
|
||||
public class RMStateUpdateAppEvent extends RMStateStoreEvent {
|
||||
private final ApplicationStateData appState;
|
||||
// After application state is updated in state store,
|
||||
// should notify back to application or not
|
||||
private boolean notifyApplication;
|
||||
private SettableFuture<Object> future;
|
||||
|
||||
public RMStateUpdateAppEvent(ApplicationStateData appState) {
|
||||
super(RMStateStoreEventType.UPDATE_APP);
|
||||
this.appState = appState;
|
||||
this.notifyApplication = true;
|
||||
this.future = null;
|
||||
}
|
||||
|
||||
public RMStateUpdateAppEvent(ApplicationStateData appState, boolean notifyApp) {
|
||||
this(appState);
|
||||
public RMStateUpdateAppEvent(ApplicationStateData appState, boolean notifyApp,
|
||||
SettableFuture<Object> future) {
|
||||
super(RMStateStoreEventType.UPDATE_APP);
|
||||
this.appState = appState;
|
||||
this.notifyApplication = notifyApp;
|
||||
this.future = future;
|
||||
}
|
||||
|
||||
public ApplicationStateData getAppState() {
|
||||
|
@ -44,4 +51,8 @@ public class RMStateUpdateAppEvent extends RMStateStoreEvent {
|
|||
public boolean isNotifyApplication() {
|
||||
return notifyApplication;
|
||||
}
|
||||
|
||||
public SettableFuture<Object> getResult() {
|
||||
return future;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.ipc.CallerContext;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
@ -59,6 +60,25 @@ public abstract class ApplicationStateData {
|
|||
return appState;
|
||||
}
|
||||
|
||||
public static ApplicationStateData newInstance(long submitTime,
|
||||
long startTime, String user,
|
||||
ApplicationSubmissionContext submissionContext, RMAppState state,
|
||||
String diagnostics, long finishTime, CallerContext callerContext,
|
||||
Map<ApplicationTimeoutType, Long> applicationTimeouts) {
|
||||
ApplicationStateData appState =
|
||||
Records.newRecord(ApplicationStateData.class);
|
||||
appState.setSubmitTime(submitTime);
|
||||
appState.setStartTime(startTime);
|
||||
appState.setUser(user);
|
||||
appState.setApplicationSubmissionContext(submissionContext);
|
||||
appState.setState(state);
|
||||
appState.setDiagnostics(diagnostics);
|
||||
appState.setFinishTime(finishTime);
|
||||
appState.setCallerContext(callerContext);
|
||||
appState.setApplicationTimeouts(applicationTimeouts);
|
||||
return appState;
|
||||
}
|
||||
|
||||
public static ApplicationStateData newInstance(long submitTime,
|
||||
long startTime, ApplicationSubmissionContext context, String user,
|
||||
CallerContext callerContext) {
|
||||
|
@ -168,4 +188,11 @@ public abstract class ApplicationStateData {
|
|||
public abstract CallerContext getCallerContext();
|
||||
|
||||
public abstract void setCallerContext(CallerContext callerContext);
|
||||
|
||||
@Public
|
||||
public abstract Map<ApplicationTimeoutType, Long> getApplicationTimeouts();
|
||||
|
||||
@Public
|
||||
public abstract void setApplicationTimeouts(
|
||||
Map<ApplicationTimeoutType, Long> applicationTimeouts);
|
||||
}
|
||||
|
|
|
@ -18,10 +18,18 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.ipc.CallerContext;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutMapProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.RMAppStateProto;
|
||||
|
@ -38,6 +46,7 @@ public class ApplicationStateDataPBImpl extends ApplicationStateData {
|
|||
boolean viaProto = false;
|
||||
|
||||
private ApplicationSubmissionContext applicationSubmissionContext = null;
|
||||
private Map<ApplicationTimeoutType, Long> applicationTimeouts = null;
|
||||
|
||||
public ApplicationStateDataPBImpl() {
|
||||
builder = ApplicationStateDataProto.newBuilder();
|
||||
|
@ -63,6 +72,10 @@ public class ApplicationStateDataPBImpl extends ApplicationStateData {
|
|||
((ApplicationSubmissionContextPBImpl)applicationSubmissionContext)
|
||||
.getProto());
|
||||
}
|
||||
|
||||
if (this.applicationTimeouts != null) {
|
||||
addApplicationTimeouts();
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
|
@ -256,4 +269,77 @@ public class ApplicationStateDataPBImpl extends ApplicationStateData {
|
|||
public static RMAppState convertFromProtoFormat(RMAppStateProto e) {
|
||||
return RMAppState.valueOf(e.name().replace(RM_APP_PREFIX, ""));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ApplicationTimeoutType, Long> getApplicationTimeouts() {
|
||||
initApplicationTimeout();
|
||||
return this.applicationTimeouts;
|
||||
}
|
||||
|
||||
private void initApplicationTimeout() {
|
||||
if (this.applicationTimeouts != null) {
|
||||
return;
|
||||
}
|
||||
ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<ApplicationTimeoutMapProto> lists = p.getApplicationTimeoutsList();
|
||||
this.applicationTimeouts =
|
||||
new HashMap<ApplicationTimeoutType, Long>(lists.size());
|
||||
for (ApplicationTimeoutMapProto timeoutProto : lists) {
|
||||
this.applicationTimeouts.put(
|
||||
ProtoUtils
|
||||
.convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()),
|
||||
timeoutProto.getTimeout());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationTimeouts(
|
||||
Map<ApplicationTimeoutType, Long> appTimeouts) {
|
||||
if (appTimeouts == null) {
|
||||
return;
|
||||
}
|
||||
initApplicationTimeout();
|
||||
this.applicationTimeouts.clear();
|
||||
this.applicationTimeouts.putAll(appTimeouts);
|
||||
}
|
||||
|
||||
private void addApplicationTimeouts() {
|
||||
maybeInitBuilder();
|
||||
builder.clearApplicationTimeouts();
|
||||
if (applicationTimeouts == null) {
|
||||
return;
|
||||
}
|
||||
Iterable<? extends ApplicationTimeoutMapProto> values =
|
||||
new Iterable<ApplicationTimeoutMapProto>() {
|
||||
|
||||
@Override
|
||||
public Iterator<ApplicationTimeoutMapProto> iterator() {
|
||||
return new Iterator<ApplicationTimeoutMapProto>() {
|
||||
private Iterator<ApplicationTimeoutType> iterator =
|
||||
applicationTimeouts.keySet().iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return iterator.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationTimeoutMapProto next() {
|
||||
ApplicationTimeoutType key = iterator.next();
|
||||
return ApplicationTimeoutMapProto.newBuilder()
|
||||
.setTimeout(applicationTimeouts.get(key))
|
||||
.setApplicationTimeoutType(
|
||||
ProtoUtils.convertToProtoFormat(key))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
this.builder.addAllApplicationTimeouts(values);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
|
@ -257,4 +258,6 @@ public interface RMApp extends EventHandler<RMAppEvent> {
|
|||
String getAppNodeLabelExpression();
|
||||
|
||||
CallerContext getCallerContext();
|
||||
|
||||
Map<ApplicationTimeoutType, Long> getApplicationTimeouts();
|
||||
}
|
||||
|
|
|
@ -120,6 +120,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
|
||||
private static final String UNAVAILABLE = "N/A";
|
||||
private static final EnumSet<RMAppState> COMPLETED_APP_STATES =
|
||||
EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED,
|
||||
RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING);
|
||||
|
||||
// Immutable fields
|
||||
private final ApplicationId applicationId;
|
||||
|
@ -177,6 +180,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
private Map<NodeId, List<String>> logAggregationFailureMessagesForNMs =
|
||||
new HashMap<NodeId, List<String>>();
|
||||
private final int maxLogAggregationDiagnosticsInMemory;
|
||||
private Map<ApplicationTimeoutType, Long> applicationTimeouts =
|
||||
new HashMap<ApplicationTimeoutType, Long>();
|
||||
|
||||
// These states stored are only valid when app is at killing or final_saving.
|
||||
private RMAppState stateBeforeKilling;
|
||||
|
@ -830,6 +835,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
this.storedFinishTime = appState.getFinishTime();
|
||||
this.startTime = appState.getStartTime();
|
||||
this.callerContext = appState.getCallerContext();
|
||||
this.applicationTimeouts = appState.getApplicationTimeouts();
|
||||
// If interval > 0, some attempts might have been deleted.
|
||||
if (this.attemptFailuresValidityInterval > 0) {
|
||||
this.firstAttemptIdInStateStore = appState.getFirstAttemptId();
|
||||
|
@ -1024,17 +1030,16 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
}
|
||||
}
|
||||
|
||||
long applicationLifetime =
|
||||
app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME);
|
||||
if (applicationLifetime > 0) {
|
||||
for (Map.Entry<ApplicationTimeoutType, Long> timeout :
|
||||
app.applicationTimeouts.entrySet()) {
|
||||
app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
|
||||
ApplicationTimeoutType.LIFETIME, app.submitTime,
|
||||
applicationLifetime * 1000);
|
||||
timeout.getKey(), timeout.getValue());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
long remainingTime = timeout.getValue() - app.systemClock.getTime();
|
||||
LOG.debug("Application " + app.applicationId
|
||||
+ " is registered for timeout monitor, type="
|
||||
+ ApplicationTimeoutType.LIFETIME + " value="
|
||||
+ applicationLifetime + " seconds");
|
||||
+ " is registered for timeout monitor, type=" + timeout.getKey()
|
||||
+ " remaining timeout="
|
||||
+ (remainingTime > 0 ? remainingTime / 1000 : 0) + " seconds");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1150,10 +1155,17 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
long applicationLifetime =
|
||||
app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME);
|
||||
if (applicationLifetime > 0) {
|
||||
// calculate next timeout value
|
||||
Long newTimeout =
|
||||
Long.valueOf(app.submitTime + (applicationLifetime * 1000));
|
||||
app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
|
||||
ApplicationTimeoutType.LIFETIME, app.submitTime,
|
||||
applicationLifetime * 1000);
|
||||
LOG.debug("Application " + app.applicationId
|
||||
ApplicationTimeoutType.LIFETIME, newTimeout);
|
||||
|
||||
// update applicationTimeouts with new absolute value.
|
||||
app.applicationTimeouts.put(ApplicationTimeoutType.LIFETIME,
|
||||
newTimeout);
|
||||
|
||||
LOG.info("Application " + app.applicationId
|
||||
+ " is registered for timeout monitor, type="
|
||||
+ ApplicationTimeoutType.LIFETIME + " value=" + applicationLifetime
|
||||
+ " seconds");
|
||||
|
@ -1207,6 +1219,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
ApplicationStateData.newInstance(this.submitTime, this.startTime,
|
||||
this.user, this.submissionContext,
|
||||
stateToBeStored, diags, this.storedFinishTime, this.callerContext);
|
||||
appState.setApplicationTimeouts(this.applicationTimeouts);
|
||||
this.rmContext.getStateStore().updateApplicationState(appState);
|
||||
}
|
||||
|
||||
|
@ -1882,4 +1895,31 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
}
|
||||
return applicationLifetime;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ApplicationTimeoutType, Long> getApplicationTimeouts() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return new HashMap(this.applicationTimeouts);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void updateApplicationTimeout(
|
||||
Map<ApplicationTimeoutType, Long> updateTimeout) {
|
||||
this.writeLock.lock();
|
||||
try {
|
||||
if (COMPLETED_APP_STATES.contains(getState())) {
|
||||
return;
|
||||
}
|
||||
// update monitoring service
|
||||
this.rmContext.getRMAppLifetimeMonitor()
|
||||
.updateApplicationTimeouts(getApplicationId(), updateTimeout);
|
||||
this.applicationTimeouts.putAll(updateTimeout);
|
||||
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,9 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor;
|
||||
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -33,7 +32,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
|
||||
|
@ -47,12 +45,6 @@ public class RMAppLifetimeMonitor
|
|||
private static final Log LOG = LogFactory.getLog(RMAppLifetimeMonitor.class);
|
||||
|
||||
private RMContext rmContext;
|
||||
private Map<RMAppToMonitor, Long> monitoredApps =
|
||||
new HashMap<RMAppToMonitor, Long>();
|
||||
|
||||
private static final EnumSet<RMAppState> COMPLETED_APP_STATES =
|
||||
EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED,
|
||||
RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING);
|
||||
|
||||
public RMAppLifetimeMonitor(RMContext rmContext) {
|
||||
super(RMAppLifetimeMonitor.class.getName(), SystemClock.getInstance());
|
||||
|
@ -61,14 +53,16 @@ public class RMAppLifetimeMonitor
|
|||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
long monitorInterval = conf.getLong(
|
||||
YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS);
|
||||
long monitorInterval =
|
||||
conf.getLong(YarnConfiguration.RM_APPLICATION_MONITOR_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_RM_APPLICATION_MONITOR_INTERVAL_MS);
|
||||
if (monitorInterval <= 0) {
|
||||
monitorInterval =
|
||||
YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS;
|
||||
YarnConfiguration.DEFAULT_RM_APPLICATION_MONITOR_INTERVAL_MS;
|
||||
}
|
||||
setMonitorInterval(monitorInterval);
|
||||
setExpireInterval(0); // No need of expire interval for App.
|
||||
setResetTimeOnStart(false); // do not reset expire time on restart
|
||||
LOG.info("Application lifelime monitor interval set to " + monitorInterval
|
||||
+ " ms.");
|
||||
super.serviceInit(conf);
|
||||
|
@ -77,57 +71,42 @@ public class RMAppLifetimeMonitor
|
|||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
protected synchronized void expire(RMAppToMonitor monitoredAppKey) {
|
||||
Long remove = monitoredApps.remove(monitoredAppKey);
|
||||
ApplicationId appId = monitoredAppKey.getApplicationId();
|
||||
RMApp app = rmContext.getRMApps().get(appId);
|
||||
if (app == null) {
|
||||
return;
|
||||
}
|
||||
// Don't trigger a KILL event if application is in completed states
|
||||
if (!COMPLETED_APP_STATES.contains(app.getState())) {
|
||||
String diagnostics =
|
||||
"Application killed due to exceeding its lifetime period " + remove
|
||||
+ " milliseconds";
|
||||
rmContext.getDispatcher().getEventHandler()
|
||||
.handle(new RMAppEvent(appId, RMAppEventType.KILL, diagnostics));
|
||||
} else {
|
||||
LOG.info("Application " + appId
|
||||
+ " is about to complete. So not killing the application.");
|
||||
}
|
||||
String diagnostics =
|
||||
"Application killed due to exceeding its lifetime period";
|
||||
rmContext.getDispatcher().getEventHandler()
|
||||
.handle(new RMAppEvent(appId, RMAppEventType.KILL, diagnostics));
|
||||
}
|
||||
|
||||
public synchronized void registerApp(ApplicationId appId,
|
||||
ApplicationTimeoutType timeoutType, long monitorStartTime, long timeout) {
|
||||
public void registerApp(ApplicationId appId,
|
||||
ApplicationTimeoutType timeoutType, long expireTime) {
|
||||
RMAppToMonitor appToMonitor = new RMAppToMonitor(appId, timeoutType);
|
||||
register(appToMonitor, monitorStartTime);
|
||||
Long oldTimeout = monitoredApps.get(appToMonitor);
|
||||
if (oldTimeout == null) {
|
||||
monitoredApps.put(appToMonitor, timeout);
|
||||
}
|
||||
register(appToMonitor, expireTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized long getExpireInterval(
|
||||
RMAppToMonitor monitoredAppKey) {
|
||||
return monitoredApps.get(monitoredAppKey);
|
||||
}
|
||||
|
||||
public synchronized void unregisterApp(ApplicationId appId,
|
||||
public void unregisterApp(ApplicationId appId,
|
||||
ApplicationTimeoutType timeoutType) {
|
||||
RMAppToMonitor appToRemove = new RMAppToMonitor(appId, timeoutType);
|
||||
unregister(appToRemove);
|
||||
monitoredApps.remove(appToRemove);
|
||||
RMAppToMonitor remove = new RMAppToMonitor(appId, timeoutType);
|
||||
unregister(remove);
|
||||
}
|
||||
|
||||
public synchronized void unregisterApp(ApplicationId appId,
|
||||
Set<ApplicationTimeoutType> types) {
|
||||
for (ApplicationTimeoutType type : types) {
|
||||
unregisterApp(appId, type);
|
||||
public void unregisterApp(ApplicationId appId,
|
||||
Set<ApplicationTimeoutType> timeoutTypes) {
|
||||
for (ApplicationTimeoutType timeoutType : timeoutTypes) {
|
||||
unregisterApp(appId, timeoutType);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void updateApplicationTimeouts(ApplicationId appId,
|
||||
public void updateApplicationTimeouts(ApplicationId appId,
|
||||
Map<ApplicationTimeoutType, Long> timeouts) {
|
||||
// TODO in YARN-5611
|
||||
for (Entry<ApplicationTimeoutType, Long> entry : timeouts.entrySet()) {
|
||||
ApplicationTimeoutType timeoutType = entry.getKey();
|
||||
RMAppToMonitor update = new RMAppToMonitor(appId, timeoutType);
|
||||
register(update, entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -2419,8 +2419,9 @@ public class CapacityScheduler extends
|
|||
ApplicationStateData.newInstance(rmApp.getSubmitTime(),
|
||||
rmApp.getStartTime(), rmApp.getApplicationSubmissionContext(),
|
||||
rmApp.getUser(), rmApp.getCallerContext());
|
||||
appState.setApplicationTimeouts(rmApp.getApplicationTimeouts());
|
||||
rmContext.getStateStore().updateApplicationStateSynchronously(appState,
|
||||
false);
|
||||
false, null);
|
||||
|
||||
// As we use iterator over a TreeSet for OrderingPolicy, once we change
|
||||
// priority then reinsert back to make order correct.
|
||||
|
|
|
@ -69,6 +69,7 @@ message ApplicationStateDataProto {
|
|||
optional string diagnostics = 6 [default = "N/A"];
|
||||
optional int64 finish_time = 7;
|
||||
optional hadoop.common.RPCCallerContextProto caller_context = 8;
|
||||
repeated ApplicationTimeoutMapProto application_timeouts = 9;
|
||||
}
|
||||
|
||||
message ApplicationAttemptStateDataProto {
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
|
@ -217,6 +218,11 @@ public abstract class MockAsm extends MockApps {
|
|||
public CallerContext getCallerContext() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ApplicationTimeoutType, Long> getApplicationTimeouts() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
}
|
||||
|
||||
public static RMApp newApplication(int i) {
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
|
@ -301,4 +302,9 @@ public class MockRMApp implements RMApp {
|
|||
public CallerContext getCallerContext() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ApplicationTimeoutType, Long> getApplicationTimeouts() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
@ -27,11 +28,14 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
|
@ -39,8 +43,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestWorkPreservingRMRestart;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
@ -60,15 +66,11 @@ public class TestApplicationLifetimeMonitor {
|
|||
Logger rootLogger = LogManager.getRootLogger();
|
||||
rootLogger.setLevel(Level.DEBUG);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
|
||||
true);
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
conf.setLong(YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS,
|
||||
conf.setLong(YarnConfiguration.RM_APPLICATION_MONITOR_INTERVAL_MS,
|
||||
3000L);
|
||||
}
|
||||
|
||||
@Test(timeout = 90000)
|
||||
@Test(timeout = 60000)
|
||||
public void testApplicationLifetimeMonitor() throws Exception {
|
||||
MockRM rm = null;
|
||||
try {
|
||||
|
@ -81,22 +83,64 @@ public class TestApplicationLifetimeMonitor {
|
|||
new HashMap<ApplicationTimeoutType, Long>();
|
||||
timeouts.put(ApplicationTimeoutType.LIFETIME, 10L);
|
||||
RMApp app1 = rm.submitApp(1024, appPriority, timeouts);
|
||||
|
||||
// 20L seconds
|
||||
timeouts.put(ApplicationTimeoutType.LIFETIME, 20L);
|
||||
RMApp app2 = rm.submitApp(1024, appPriority, timeouts);
|
||||
|
||||
nm1.nodeHeartbeat(true);
|
||||
// Send launch Event
|
||||
MockAM am1 =
|
||||
rm.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
|
||||
am1.registerAppAttempt();
|
||||
rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
|
||||
Assert.assertTrue("Applicaiton killed before lifetime value",
|
||||
Assert.assertTrue("Application killed before lifetime value",
|
||||
(System.currentTimeMillis() - app1.getSubmitTime()) > 10000);
|
||||
|
||||
Map<ApplicationTimeoutType, String> updateTimeout =
|
||||
new HashMap<ApplicationTimeoutType, String>();
|
||||
long newLifetime = 10L;
|
||||
// update 10L seconds more to timeout
|
||||
updateTimeout.put(ApplicationTimeoutType.LIFETIME,
|
||||
Times.formatISO8601(System.currentTimeMillis() + newLifetime * 1000));
|
||||
UpdateApplicationTimeoutsRequest request =
|
||||
UpdateApplicationTimeoutsRequest.newInstance(app2.getApplicationId(),
|
||||
updateTimeout);
|
||||
|
||||
Map<ApplicationTimeoutType, Long> applicationTimeouts =
|
||||
app2.getApplicationTimeouts();
|
||||
// has old timeout time
|
||||
long beforeUpdate =
|
||||
applicationTimeouts.get(ApplicationTimeoutType.LIFETIME);
|
||||
|
||||
// update app2 lifetime to new time i.e now + timeout
|
||||
rm.getRMContext().getClientRMService().updateApplicationTimeouts(request);
|
||||
|
||||
applicationTimeouts =
|
||||
app2.getApplicationTimeouts();
|
||||
long afterUpdate =
|
||||
applicationTimeouts.get(ApplicationTimeoutType.LIFETIME);
|
||||
|
||||
Assert.assertTrue("Application lifetime value not updated",
|
||||
afterUpdate > beforeUpdate);
|
||||
|
||||
rm.waitForState(app2.getApplicationId(), RMAppState.KILLED);
|
||||
// verify for app killed with updated lifetime
|
||||
Assert.assertTrue("Application killed before lifetime value",
|
||||
app2.getFinishTime() > afterUpdate);
|
||||
|
||||
} finally {
|
||||
stopRM(rm);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test(timeout = 180000)
|
||||
public void testApplicationLifetimeOnRMRestart() throws Exception {
|
||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
|
||||
true);
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
|
@ -115,6 +159,12 @@ public class TestApplicationLifetimeMonitor {
|
|||
|
||||
// Re-start RM
|
||||
MockRM rm2 = new MockRM(conf, memStore);
|
||||
|
||||
// make sure app has been unregistered with old RM else both will trigger
|
||||
// Expire event
|
||||
rm1.getRMContext().getRMAppLifetimeMonitor().unregisterApp(
|
||||
app1.getApplicationId(), ApplicationTimeoutType.LIFETIME);
|
||||
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
|
||||
|
@ -152,9 +202,87 @@ public class TestApplicationLifetimeMonitor {
|
|||
|
||||
// wait for app life time and application to be in killed state.
|
||||
rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.KILLED);
|
||||
Assert.assertTrue("Applicaiton killed before lifetime value",
|
||||
(System.currentTimeMillis()
|
||||
- recoveredApp1.getSubmitTime()) > appLifetime);
|
||||
Assert.assertTrue("Application killed before lifetime value",
|
||||
recoveredApp1.getFinishTime() > (recoveredApp1.getSubmitTime()
|
||||
+ appLifetime * 1000));
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testUpdateApplicationTimeoutForStateStoreUpdateFail()
|
||||
throws Exception {
|
||||
MockRM rm1 = null;
|
||||
try {
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
||||
private int count = 0;
|
||||
|
||||
@Override
|
||||
public synchronized void updateApplicationStateInternal(
|
||||
ApplicationId appId, ApplicationStateData appState)
|
||||
throws Exception {
|
||||
// fail only 1 time.
|
||||
if (count++ == 0) {
|
||||
throw new Exception("State-store update failed");
|
||||
}
|
||||
super.updateApplicationStateInternal(appId, appState);
|
||||
}
|
||||
};
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
nm1.nodeHeartbeat(true);
|
||||
|
||||
long appLifetime = 30L;
|
||||
Map<ApplicationTimeoutType, Long> timeouts =
|
||||
new HashMap<ApplicationTimeoutType, Long>();
|
||||
timeouts.put(ApplicationTimeoutType.LIFETIME, appLifetime);
|
||||
RMApp app1 = rm1.submitApp(200, Priority.newInstance(0), timeouts);
|
||||
|
||||
Map<ApplicationTimeoutType, String> updateTimeout =
|
||||
new HashMap<ApplicationTimeoutType, String>();
|
||||
long newLifetime = 10L;
|
||||
// update 10L seconds more to timeout i.e 30L seconds overall
|
||||
updateTimeout.put(ApplicationTimeoutType.LIFETIME,
|
||||
Times.formatISO8601(System.currentTimeMillis() + newLifetime * 1000));
|
||||
UpdateApplicationTimeoutsRequest request =
|
||||
UpdateApplicationTimeoutsRequest.newInstance(app1.getApplicationId(),
|
||||
updateTimeout);
|
||||
|
||||
Map<ApplicationTimeoutType, Long> applicationTimeouts =
|
||||
app1.getApplicationTimeouts();
|
||||
// has old timeout time
|
||||
long beforeUpdate =
|
||||
applicationTimeouts.get(ApplicationTimeoutType.LIFETIME);
|
||||
|
||||
try {
|
||||
// update app2 lifetime to new time i.e now + timeout
|
||||
rm1.getRMContext().getClientRMService()
|
||||
.updateApplicationTimeouts(request);
|
||||
fail("Update application should fail.");
|
||||
} catch (YarnException e) {
|
||||
// expected
|
||||
assertTrue("State-store exception does not containe appId",
|
||||
e.getMessage().contains(app1.getApplicationId().toString()));
|
||||
}
|
||||
|
||||
applicationTimeouts = app1.getApplicationTimeouts();
|
||||
// has old timeout time
|
||||
long afterUpdate =
|
||||
applicationTimeouts.get(ApplicationTimeoutType.LIFETIME);
|
||||
|
||||
Assert.assertEquals("Application timeout is updated", beforeUpdate,
|
||||
afterUpdate);
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
|
||||
// verify for app killed with updated lifetime
|
||||
Assert.assertTrue("Application killed before lifetime value",
|
||||
app1.getFinishTime() > afterUpdate);
|
||||
} finally {
|
||||
stopRM(rm1);
|
||||
}
|
||||
}
|
||||
|
||||
private void stopRM(MockRM rm) {
|
||||
|
|
Loading…
Reference in New Issue