YARN-5977. ContainerManagementProtocol changes to support change of container ExecutionType. (Kartheek Muthyala via asuresh)

This commit is contained in:
Arun Suresh 2017-08-03 21:15:40 -07:00
parent f4c6b00a9f
commit 35dc782923
23 changed files with 834 additions and 79 deletions

View File

@ -31,6 +31,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
@ -462,6 +464,7 @@ public StopContainersResponse stopContainers(StopContainersRequest request)
}
@Override
@Deprecated
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws IOException,
IOException {
@ -506,5 +509,11 @@ public CommitResponse commitLastReInitialization(ContainerId containerId)
throws YarnException, IOException {
return null;
}
@Override
public ContainerUpdateResponse updateContainer(ContainerUpdateRequest
request) throws YarnException, IOException {
return null;
}
}
}

View File

@ -47,6 +47,8 @@
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
@ -465,6 +467,7 @@ public GetContainerStatusesResponse getContainerStatuses(
}
@Override
@Deprecated
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException,
IOException {
@ -511,6 +514,12 @@ public CommitResponse commitLastReInitialization(ContainerId containerId)
throws YarnException, IOException {
return null;
}
@Override
public ContainerUpdateResponse updateContainer(ContainerUpdateRequest
request) throws YarnException, IOException {
return null;
}
}
@SuppressWarnings("serial")

View File

@ -24,6 +24,8 @@
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
@ -196,10 +198,31 @@ GetContainerStatusesResponse getContainerStatuses(
*/
@Public
@Unstable
@Deprecated
IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException,
IOException;
/**
* <p>
* The API used by the <code>ApplicationMaster</code> to request for
* resource update of running containers on the <code>NodeManager</code>.
* </p>
*
* @param request
* request to update resource of a list of containers
* @return response which includes a list of containerIds of containers
* whose resource has been successfully updated and a
* containerId-to-exception map for failed requests.
*
* @throws YarnException Exception specific to YARN
* @throws IOException IOException thrown from NodeManager
*/
@Public
@Unstable
ContainerUpdateResponse updateContainer(ContainerUpdateRequest request)
throws YarnException, IOException;
SignalContainerResponse signalToContainer(SignalContainerRequest request)
throws YarnException, IOException;

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.util.Records;
import java.util.List;
/**
* <p>The request sent by <code>Application Master</code> to the
* <code>Node Manager</code> to change the resource quota of a container.</p>
*
* @see ContainerManagementProtocol#updateContainer(ContainerUpdateRequest)
*/
@Public
@Unstable
public abstract class ContainerUpdateRequest {
@Public
@Unstable
public static ContainerUpdateRequest newInstance(
List<Token> containersToIncrease) {
ContainerUpdateRequest request =
Records.newRecord(ContainerUpdateRequest.class);
request.setContainersToUpdate(containersToIncrease);
return request;
}
/**
* Get a list of container tokens to be used for authorization during
* container resource update.
* <p>
* Note: {@link NMToken} will be used for authenticating communication with
* {@code NodeManager}.
* @return the list of container tokens to be used for authorization during
* container resource update.
* @see NMToken
*/
@Public
@Unstable
public abstract List<Token> getContainersToUpdate();
/**
* Set container tokens to be used during container resource increase.
* The token is acquired from
* <code>AllocateResponse.getUpdatedContainers</code>.
* The token contains the container id and resource capability required for
* container resource update.
* @param containersToUpdate the list of container tokens to be used
* for container resource increase.
*/
@Public
@Unstable
public abstract void setContainersToUpdate(
List<Token> containersToUpdate);
}

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.util.Records;
import java.util.List;
import java.util.Map;
/**
* <p>
* The response sent by the <code>NodeManager</code> to the
* <code>ApplicationMaster</code> when asked to update container resource.
* </p>
*
* @see ContainerManagementProtocol#updateContainer(ContainerUpdateRequest)
*/
@Public
@Unstable
public abstract class ContainerUpdateResponse {
public static ContainerUpdateResponse newInstance(
List<ContainerId> successfullyUpdatedContainers,
Map<ContainerId, SerializedException> failedRequests) {
ContainerUpdateResponse response =
Records.newRecord(ContainerUpdateResponse.class);
response.setSuccessfullyUpdatedContainers(
successfullyUpdatedContainers);
response.setFailedRequests(failedRequests);
return response;
}
/**
* Get the list of containerIds of containers whose resource
* have been successfully update.
*
* @return the list of containerIds of containers whose resource have
* been successfully updated.
*/
@Public
@Unstable
public abstract List<ContainerId> getSuccessfullyUpdatedContainers();
/**
* Set the list of containerIds of containers whose resource have
* been successfully updated.
* @param succeedUpdatedContainers Containers whose update request were
* successfully completed.
*/
@Private
@Unstable
public abstract void setSuccessfullyUpdatedContainers(
List<ContainerId> succeedUpdatedContainers);
/**
* Get the containerId-to-exception map in which the exception indicates
* error from each container for failed requests.
* @return map of containerId-to-exception
*/
@Public
@Unstable
public abstract Map<ContainerId, SerializedException> getFailedRequests();
/**
* Set the containerId-to-exception map in which the exception indicates
* error from each container for failed requests.
* @param failedRequests Containers whose update request were failed
*/
@Private
@Unstable
public abstract void setFailedRequests(
Map<ContainerId, SerializedException> failedRequests);
}

View File

@ -36,6 +36,7 @@ service ContainerManagementProtocolService {
rpc stopContainers(StopContainersRequestProto) returns (StopContainersResponseProto);
rpc getContainerStatuses(GetContainerStatusesRequestProto) returns (GetContainerStatusesResponseProto);
rpc increaseContainersResource(IncreaseContainersResourceRequestProto) returns (IncreaseContainersResourceResponseProto);
rpc updateContainer(ContainerUpdateRequestProto) returns (ContainerUpdateResponseProto);
rpc signalToContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto);
rpc localize(ResourceLocalizationRequestProto) returns (ResourceLocalizationResponseProto);

View File

@ -368,6 +368,15 @@ message IncreaseContainersResourceResponseProto {
repeated ContainerExceptionMapProto failed_requests = 2;
}
message ContainerUpdateRequestProto {
repeated hadoop.common.TokenProto update_container_token = 1;
}
message ContainerUpdateResponseProto {
repeated ContainerIdProto succeeded_requests = 1;
repeated ContainerExceptionMapProto failed_requests = 2;
}
//////////////////////////////////////////////////////
/////// Application_History_Protocol /////////////////
//////////////////////////////////////////////////////

View File

@ -34,17 +34,17 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@ -239,12 +239,12 @@ public void increaseContainerResource(Container container)
container.getNodeId().toString(), container.getId());
List<Token> increaseTokens = new ArrayList<>();
increaseTokens.add(container.getContainerToken());
IncreaseContainersResourceRequest increaseRequest =
IncreaseContainersResourceRequest
.newInstance(increaseTokens);
IncreaseContainersResourceResponse response =
proxy.getContainerManagementProtocol()
.increaseContainersResource(increaseRequest);
ContainerUpdateRequest request =
ContainerUpdateRequest.newInstance(increaseTokens);
ContainerUpdateResponse response =
proxy.getContainerManagementProtocol().updateContainer(request);
if (response.getFailedRequests() != null
&& response.getFailedRequests().containsKey(container.getId())) {
Throwable t = response.getFailedRequests().get(container.getId())

View File

@ -28,6 +28,8 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
@ -45,10 +47,10 @@
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CommitResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ContainerUpdateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ContainerUpdateResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReInitializeContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReInitializeContainerResponsePBImpl;
@ -56,8 +58,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RestartContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.RollbackResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RollbackResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
@ -71,8 +72,8 @@
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
@ -161,14 +162,35 @@ public GetContainerStatusesResponse getContainerStatuses(
}
@Override
@Deprecated
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException,
IOException {
IncreaseContainersResourceRequestProto requestProto =
((IncreaseContainersResourceRequestPBImpl)request).getProto();
try {
return new IncreaseContainersResourceResponsePBImpl(
proxy.increaseContainersResource(null, requestProto));
ContainerUpdateRequest req =
ContainerUpdateRequest.newInstance(request.getContainersToIncrease());
ContainerUpdateRequestProto reqProto =
((ContainerUpdateRequestPBImpl) req).getProto();
ContainerUpdateResponse resp = new ContainerUpdateResponsePBImpl(
proxy.updateContainer(null, reqProto));
return IncreaseContainersResourceResponse
.newInstance(resp.getSuccessfullyUpdatedContainers(),
resp.getFailedRequests());
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
@Override
public ContainerUpdateResponse updateContainer(ContainerUpdateRequest
request) throws YarnException, IOException {
ContainerUpdateRequestProto requestProto =
((ContainerUpdateRequestPBImpl)request).getProto();
try {
return new ContainerUpdateResponsePBImpl(
proxy.updateContainer(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;

View File

@ -24,6 +24,8 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
@ -34,6 +36,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CommitResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ContainerUpdateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ContainerUpdateResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
@ -74,6 +78,8 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.CommitResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateResponseProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@ -137,8 +143,12 @@ public IncreaseContainersResourceResponseProto increaseContainersResource(
IncreaseContainersResourceRequestPBImpl request =
new IncreaseContainersResourceRequestPBImpl(proto);
try {
ContainerUpdateResponse resp = real.updateContainer(ContainerUpdateRequest
.newInstance(request.getContainersToIncrease()));
IncreaseContainersResourceResponse response =
real.increaseContainersResource(request);
IncreaseContainersResourceResponse
.newInstance(resp.getSuccessfullyUpdatedContainers(),
resp.getFailedRequests());
return ((IncreaseContainersResourceResponsePBImpl)response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
@ -147,6 +157,22 @@ public IncreaseContainersResourceResponseProto increaseContainersResource(
}
}
@Override
public ContainerUpdateResponseProto updateContainer(
RpcController controller, ContainerUpdateRequestProto proto)
throws ServiceException {
ContainerUpdateRequestPBImpl request =
new ContainerUpdateRequestPBImpl(proto);
try {
ContainerUpdateResponse response = real.updateContainer(request);
return ((ContainerUpdateResponsePBImpl)response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public SignalContainerResponseProto signalToContainer(RpcController arg0,
SignalContainerRequestProto proto) throws ServiceException {

View File

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import com.google.protobuf.TextFormat;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateRequestProtoOrBuilder;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* <p>An implementation of <code>ContainerUpdateRequest</code>.</p>
*
* @see ContainerUpdateRequest
*/
@Private
@Unstable
public class ContainerUpdateRequestPBImpl extends ContainerUpdateRequest {
private ContainerUpdateRequestProto proto =
ContainerUpdateRequestProto.getDefaultInstance();
private ContainerUpdateRequestProto.Builder builder = null;
private boolean viaProto = false;
private List<Token> containersToUpdate = null;
public ContainerUpdateRequestPBImpl() {
builder = ContainerUpdateRequestProto.newBuilder();
}
public ContainerUpdateRequestPBImpl(ContainerUpdateRequestProto proto) {
this.proto = proto;
viaProto = true;
}
@Override
public List<Token> getContainersToUpdate() {
if (containersToUpdate != null) {
return containersToUpdate;
}
ContainerUpdateRequestProtoOrBuilder p = viaProto ? proto : builder;
List<TokenProto> list = p.getUpdateContainerTokenList();
containersToUpdate = new ArrayList<>();
for (TokenProto c : list) {
containersToUpdate.add(convertFromProtoFormat(c));
}
return containersToUpdate;
}
@Override
public void setContainersToUpdate(List<Token> containersToUpdate) {
maybeInitBuilder();
if (containersToUpdate == null) {
builder.clearUpdateContainerToken();
}
this.containersToUpdate = containersToUpdate;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
public ContainerUpdateRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private Token convertFromProtoFormat(TokenProto p) {
return new TokenPBImpl(p);
}
private TokenProto convertToProtoFormat(Token t) {
return ((TokenPBImpl) t).getProto();
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ContainerUpdateRequestProto.newBuilder(proto);
}
viaProto = false;
}
private void mergeLocalToBuilder() {
if (this.containersToUpdate != null) {
addUpdateContainersToProto();
}
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void addUpdateContainersToProto() {
maybeInitBuilder();
builder.clearUpdateContainerToken();
if (this.containersToUpdate == null) {
return;
}
Iterable<TokenProto> iterable = new Iterable<TokenProto>() {
@Override
public Iterator<TokenProto> iterator() {
return new Iterator<TokenProto>() {
private Iterator<Token> iter = containersToUpdate.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public TokenProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllUpdateContainerToken(iterable);
}
}

View File

@ -0,0 +1,241 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import com.google.protobuf.TextFormat;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.SerializedExceptionProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerExceptionMapProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateResponseProtoOrBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* <p>An implementation of <code>ContainerUpdateResponse</code>.</p>
*
* @see ContainerUpdateResponse
*/
@Private
@Unstable
public class ContainerUpdateResponsePBImpl extends ContainerUpdateResponse {
private ContainerUpdateResponseProto proto =
ContainerUpdateResponseProto.getDefaultInstance();
private ContainerUpdateResponseProto.Builder builder = null;
private boolean viaProto = false;
private List<ContainerId> succeededRequests = null;
private Map<ContainerId, SerializedException> failedRequests = null;
public ContainerUpdateResponsePBImpl() {
builder = ContainerUpdateResponseProto.newBuilder();
}
public ContainerUpdateResponsePBImpl(ContainerUpdateResponseProto proto) {
this.proto = proto;
viaProto = true;
}
@Override
public List<ContainerId> getSuccessfullyUpdatedContainers() {
initSucceededRequests();
return this.succeededRequests;
}
@Override
public void setSuccessfullyUpdatedContainers(List<ContainerId> succeeded) {
maybeInitBuilder();
if (succeeded == null) {
builder.clearSucceededRequests();
}
this.succeededRequests = succeeded;
}
@Override
public Map<ContainerId, SerializedException> getFailedRequests() {
initFailedRequests();
return this.failedRequests;
}
@Override
public void setFailedRequests(
Map<ContainerId, SerializedException> failedRequests) {
maybeInitBuilder();
if (failedRequests == null) {
builder.clearFailedRequests();
}
this.failedRequests = failedRequests;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
public ContainerUpdateResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void initSucceededRequests() {
if (this.succeededRequests != null) {
return;
}
ContainerUpdateResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerIdProto> list = p.getSucceededRequestsList();
this.succeededRequests = new ArrayList<ContainerId>();
for (ContainerIdProto c : list) {
this.succeededRequests.add(convertFromProtoFormat(c));
}
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ContainerUpdateResponseProto.newBuilder(proto);
}
viaProto = false;
}
private void initFailedRequests() {
if (this.failedRequests != null) {
return;
}
ContainerUpdateResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerExceptionMapProto> protoList = p.getFailedRequestsList();
this.failedRequests = new HashMap<ContainerId, SerializedException>();
for (ContainerExceptionMapProto ce : protoList) {
this.failedRequests.put(convertFromProtoFormat(ce.getContainerId()),
convertFromProtoFormat(ce.getException()));
}
}
private void mergeLocalToBuilder() {
if (this.succeededRequests != null) {
addSucceededRequestsToProto();
}
if (this.failedRequests != null) {
addFailedRequestsToProto();
}
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void addSucceededRequestsToProto() {
maybeInitBuilder();
builder.clearSucceededRequests();
if (this.succeededRequests == null) {
return;
}
Iterable<ContainerIdProto> iterable = new Iterable<ContainerIdProto>() {
@Override
public Iterator<ContainerIdProto> iterator() {
return new Iterator<ContainerIdProto>() {
private Iterator<ContainerId> iter = succeededRequests.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ContainerIdProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllSucceededRequests(iterable);
}
private void addFailedRequestsToProto() {
maybeInitBuilder();
builder.clearFailedRequests();
if (this.failedRequests == null) {
return;
}
List<ContainerExceptionMapProto> protoList =
new ArrayList<ContainerExceptionMapProto>();
for (Map.Entry<ContainerId, SerializedException> entry : this.failedRequests
.entrySet()) {
protoList.add(ContainerExceptionMapProto.newBuilder()
.setContainerId(convertToProtoFormat(entry.getKey()))
.setException(convertToProtoFormat(entry.getValue())).build());
}
builder.addAllFailedRequests(protoList);
}
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p);
}
private ContainerIdProto convertToProtoFormat(ContainerId t) {
return ((ContainerIdPBImpl) t).getProto();
}
private SerializedExceptionPBImpl convertFromProtoFormat(
SerializedExceptionProto p) {
return new SerializedExceptionPBImpl(p);
}
private SerializedExceptionProto convertToProtoFormat(SerializedException t) {
return ((SerializedExceptionPBImpl) t).getProto();
}
}

View File

@ -33,6 +33,8 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
@ -193,6 +195,7 @@ public GetContainerStatusesResponse getContainerStatuses(
}
@Override
@Deprecated
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException, IOException {
return null;
@ -236,5 +239,11 @@ public CommitResponse commitLastReInitialization(ContainerId containerId)
throws YarnException, IOException {
return null;
}
@Override
public ContainerUpdateResponse updateContainer(ContainerUpdateRequest
request) throws YarnException, IOException {
return null;
}
}
}

View File

@ -27,6 +27,8 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
@ -111,11 +113,11 @@ private void testRPCTimeout(String rpcClass) throws Exception {
// Construct container resource increase request,
List<Token> increaseTokens = new ArrayList<>();
increaseTokens.add(containerToken);
IncreaseContainersResourceRequest increaseRequest =
IncreaseContainersResourceRequest
.newInstance(increaseTokens);
ContainerUpdateRequest request = ContainerUpdateRequest
.newInstance(increaseTokens);
try {
proxy.increaseContainersResource(increaseRequest);
proxy.updateContainer(request);
} catch (Exception e) {
LOG.info(StringUtils.stringifyException(e));
Assert.assertEquals("Error, exception is not: "
@ -170,8 +172,16 @@ public GetContainerStatusesResponse getContainerStatuses(
}
@Override
@Deprecated
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException, IOException {
IncreaseContainersResourceRequest request)
throws YarnException, IOException {
return null;
}
@Override
public ContainerUpdateResponse updateContainer(ContainerUpdateRequest
request) throws YarnException, IOException {
try {
// make the thread sleep to look like its not going to respond
Thread.sleep(10000);

View File

@ -27,6 +27,8 @@
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
@ -379,6 +381,8 @@ public static void setup() throws Exception {
generateByNewInstance(StartContainerRequest.class);
generateByNewInstance(NodeLabel.class);
generateByNewInstance(UpdatedContainer.class);
generateByNewInstance(ContainerUpdateRequest.class);
generateByNewInstance(ContainerUpdateResponse.class);
// genByNewInstance does not apply to QueueInfo, cause
// it is recursive(has sub queues)
typeValueCache.put(QueueInfo.class, QueueInfo.newInstance("root", 1.0f,

View File

@ -34,6 +34,8 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
@ -341,6 +343,7 @@ public StopContainersResponse stopContainers(StopContainersRequest request)
}
@Override
@Deprecated
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request)
throws YarnException, IOException {
@ -385,6 +388,12 @@ public CommitResponse commitLastReInitialization(ContainerId containerId)
throws YarnException, IOException {
return null;
}
@Override
public ContainerUpdateResponse updateContainer(ContainerUpdateRequest
request) throws YarnException, IOException {
return null;
}
}
public static ContainerTokenIdentifier newContainerTokenIdentifier(

View File

@ -39,6 +39,8 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
@ -1133,13 +1135,26 @@ protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
* Increase resource of a list of containers on this NodeManager.
*/
@Override
@Deprecated
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest requests)
throws YarnException, IOException {
ContainerUpdateResponse resp = updateContainer(
ContainerUpdateRequest.newInstance(requests.getContainersToIncrease()));
return IncreaseContainersResourceResponse.newInstance(
resp.getSuccessfullyUpdatedContainers(), resp.getFailedRequests());
}
/**
* Update resource of a list of containers on this NodeManager.
*/
@Override
public ContainerUpdateResponse updateContainer(ContainerUpdateRequest
request) throws YarnException, IOException {
UserGroupInformation remoteUgi = getRemoteUgi();
NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
authorizeUser(remoteUgi, nmTokenIdentifier);
List<ContainerId> successfullyIncreasedContainers
List<ContainerId> successfullyUpdatedContainers
= new ArrayList<ContainerId>();
Map<ContainerId, SerializedException> failedContainers =
new HashMap<ContainerId, SerializedException>();
@ -1151,7 +1166,7 @@ public IncreaseContainersResourceResponse increaseContainersResource(
synchronized (this.context) {
// Process container resource increase requests
for (org.apache.hadoop.yarn.api.records.Token token :
requests.getContainersToIncrease()) {
request.getContainersToUpdate()) {
ContainerId containerId = null;
try {
if (token.getIdentifier() == null) {
@ -1171,7 +1186,7 @@ public IncreaseContainersResourceResponse increaseContainersResource(
Resource resource = containerTokenIdentifier.getResource();
changeContainerResourceInternal(containerId,
containerTokenIdentifier.getVersion(), resource, true);
successfullyIncreasedContainers.add(containerId);
successfullyUpdatedContainers.add(containerId);
} catch (YarnException | InvalidToken e) {
failedContainers.put(containerId, SerializedException.newInstance(e));
} catch (IOException e) {
@ -1179,8 +1194,8 @@ public IncreaseContainersResourceResponse increaseContainersResource(
}
}
}
return IncreaseContainersResourceResponse.newInstance(
successfullyIncreasedContainers, failedContainers);
return ContainerUpdateResponse.newInstance(
successfullyUpdatedContainers, failedContainers);
}
@SuppressWarnings("unchecked")

View File

@ -45,9 +45,9 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -224,7 +224,7 @@ public void testContainerResourceIncreaseIsSynchronizedWithRMResync()
// Start a container and make sure it is in RUNNING state
((TestNodeManager4)nm).startContainer();
// Simulate a container resource increase in a separate thread
((TestNodeManager4)nm).increaseContainersResource();
((TestNodeManager4)nm).updateContainerResource();
// Simulate RM restart by sending a RESYNC event
LOG.info("Sending out RESYNC event");
nm.getNMDispatcher().getEventHandler().handle(
@ -505,7 +505,7 @@ protected void registerWithRM() throws YarnException, IOException {
class TestNodeManager4 extends NodeManager {
private Thread increaseContainerResourceThread = null;
private Thread containerUpdateResourceThread = null;
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
@ -621,11 +621,11 @@ public void startContainer()
}
// Increase container resource in a thread
public void increaseContainersResource()
public void updateContainerResource()
throws InterruptedException {
LOG.info("Increase a container resource in a separate thread");
increaseContainerResourceThread = new IncreaseContainersResourceThread();
increaseContainerResourceThread.start();
containerUpdateResourceThread = new ContainerUpdateResourceThread();
containerUpdateResourceThread.start();
}
class TestNodeStatusUpdaterImpl4 extends MockNodeStatusUpdater {
@ -652,7 +652,7 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
updateBarrier.await();
// Call the actual rebootNodeStatusUpdaterAndRegisterWithRM().
// This function should be synchronized with
// increaseContainersResource().
// updateContainer().
updateBarrier.await();
super.rebootNodeStatusUpdaterAndRegisterWithRM();
// Check status after registerWithRM
@ -672,7 +672,7 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
}
}
class IncreaseContainersResourceThread extends Thread {
class ContainerUpdateResourceThread extends Thread {
@Override
public void run() {
// Construct container resource increase request
@ -683,15 +683,15 @@ public void run() {
try {
updateBarrier.await();
increaseTokens.add(getContainerToken(targetResource));
IncreaseContainersResourceRequest increaseRequest =
IncreaseContainersResourceRequest.newInstance(increaseTokens);
IncreaseContainersResourceResponse increaseResponse =
ContainerUpdateRequest updateRequest =
ContainerUpdateRequest.newInstance(increaseTokens);
ContainerUpdateResponse updateResponse =
getContainerManager()
.increaseContainersResource(increaseRequest);
.updateContainer(updateRequest);
Assert.assertEquals(
1, increaseResponse.getSuccessfullyIncreasedContainers()
1, updateResponse.getSuccessfullyUpdatedContainers()
.size());
Assert.assertTrue(increaseResponse.getFailedRequests().isEmpty());
Assert.assertTrue(updateResponse.getFailedRequests().isEmpty());
} catch (Exception e) {
e.printStackTrace();
} finally {

View File

@ -47,10 +47,10 @@
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@ -1549,16 +1549,15 @@ public void testIncreaseContainerResourceWithInvalidRequests() throws Exception
context.getContainerTokenSecretManager(), null);
increaseTokens.add(containerToken);
IncreaseContainersResourceRequest increaseRequest =
IncreaseContainersResourceRequest
.newInstance(increaseTokens);
IncreaseContainersResourceResponse increaseResponse =
containerManager.increaseContainersResource(increaseRequest);
ContainerUpdateRequest updateRequest =
ContainerUpdateRequest.newInstance(increaseTokens);
ContainerUpdateResponse updateResponse =
containerManager.updateContainer(updateRequest);
// Check response
Assert.assertEquals(
0, increaseResponse.getSuccessfullyIncreasedContainers().size());
Assert.assertEquals(2, increaseResponse.getFailedRequests().size());
for (Map.Entry<ContainerId, SerializedException> entry : increaseResponse
0, updateResponse.getSuccessfullyUpdatedContainers().size());
Assert.assertEquals(2, updateResponse.getFailedRequests().size());
for (Map.Entry<ContainerId, SerializedException> entry : updateResponse
.getFailedRequests().entrySet()) {
Assert.assertNotNull("Failed message", entry.getValue().getMessage());
if (cId0.equals(entry.getKey())) {
@ -1635,16 +1634,15 @@ public void testIncreaseContainerResourceWithInvalidResource() throws Exception
Resource.newInstance(512, 1),
context.getContainerTokenSecretManager(), null);
increaseTokens.add(containerToken);
IncreaseContainersResourceRequest increaseRequest =
IncreaseContainersResourceRequest
.newInstance(increaseTokens);
IncreaseContainersResourceResponse increaseResponse =
containerManager.increaseContainersResource(increaseRequest);
ContainerUpdateRequest updateRequest =
ContainerUpdateRequest.newInstance(increaseTokens);
ContainerUpdateResponse updateResponse =
containerManager.updateContainer(updateRequest);
// Check response
Assert.assertEquals(
0, increaseResponse.getSuccessfullyIncreasedContainers().size());
Assert.assertEquals(1, increaseResponse.getFailedRequests().size());
for (Map.Entry<ContainerId, SerializedException> entry : increaseResponse
0, updateResponse.getSuccessfullyUpdatedContainers().size());
Assert.assertEquals(1, updateResponse.getFailedRequests().size());
for (Map.Entry<ContainerId, SerializedException> entry : updateResponse
.getFailedRequests().entrySet()) {
if (cId.equals(entry.getKey())) {
Assert.assertNotNull("Failed message", entry.getValue().getMessage());
@ -1717,13 +1715,13 @@ public void testChangeContainerResource() throws Exception {
context.getNodeId(), user, targetResource,
context.getContainerTokenSecretManager(), null);
increaseTokens.add(containerToken);
IncreaseContainersResourceRequest increaseRequest =
IncreaseContainersResourceRequest.newInstance(increaseTokens);
IncreaseContainersResourceResponse increaseResponse =
containerManager.increaseContainersResource(increaseRequest);
ContainerUpdateRequest updateRequest =
ContainerUpdateRequest.newInstance(increaseTokens);
ContainerUpdateResponse updateResponse =
containerManager.updateContainer(updateRequest);
Assert.assertEquals(
1, increaseResponse.getSuccessfullyIncreasedContainers().size());
Assert.assertTrue(increaseResponse.getFailedRequests().isEmpty());
1, updateResponse.getSuccessfullyUpdatedContainers().size());
Assert.assertTrue(updateResponse.getFailedRequests().isEmpty());
// Check status
List<ContainerId> containerIds = new ArrayList<>();
containerIds.add(cId);

View File

@ -49,9 +49,9 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -460,9 +460,9 @@ public void testContainerResizeRecovery() throws Exception {
org.apache.hadoop.yarn.server.nodemanager
.containermanager.container.ContainerState.RUNNING);
Resource targetResource = Resource.newInstance(2048, 2);
IncreaseContainersResourceResponse increaseResponse =
increaseContainersResource(context, cm, cid, targetResource);
assertTrue(increaseResponse.getFailedRequests().isEmpty());
ContainerUpdateResponse updateResponse =
updateContainers(context, cm, cid, targetResource);
assertTrue(updateResponse.getFailedRequests().isEmpty());
// check status
ContainerStatus containerStatus = getContainerStatus(context, cm, cid);
assertEquals(targetResource, containerStatus.getCapability());
@ -643,7 +643,7 @@ public StartContainersResponse run() throws Exception {
});
}
private IncreaseContainersResourceResponse increaseContainersResource(
private ContainerUpdateResponse updateContainers(
Context context, final ContainerManagerImpl cm, ContainerId cid,
Resource capability) throws Exception {
UserGroupInformation user = UserGroupInformation.createRemoteUser(
@ -655,18 +655,18 @@ private IncreaseContainersResourceResponse increaseContainersResource(
cid, 0, context.getNodeId(), user.getShortUserName(),
capability, context.getContainerTokenSecretManager(), null);
increaseTokens.add(containerToken);
final IncreaseContainersResourceRequest increaseRequest =
IncreaseContainersResourceRequest.newInstance(increaseTokens);
final ContainerUpdateRequest updateRequest =
ContainerUpdateRequest.newInstance(increaseTokens);
NMTokenIdentifier nmToken = new NMTokenIdentifier(
cid.getApplicationAttemptId(), context.getNodeId(),
user.getShortUserName(),
context.getNMTokenSecretManager().getCurrentKey().getKeyId());
user.addTokenIdentifier(nmToken);
return user.doAs(
new PrivilegedExceptionAction<IncreaseContainersResourceResponse>() {
new PrivilegedExceptionAction<ContainerUpdateResponse>() {
@Override
public IncreaseContainersResourceResponse run() throws Exception {
return cm.increaseContainersResource(increaseRequest);
public ContainerUpdateResponse run() throws Exception {
return cm.updateContainer(updateRequest);
}
});
}

View File

@ -26,6 +26,8 @@
import java.util.Map;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
@ -306,12 +308,19 @@ synchronized public GetContainerStatusesResponse getContainerStatuses(
}
@Override
@Deprecated
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request)
throws YarnException, IOException {
return null;
}
@Override
public ContainerUpdateResponse updateContainer(ContainerUpdateRequest
request) throws YarnException, IOException {
return null;
}
public static org.apache.hadoop.yarn.server.api.records.NodeStatus
createNodeStatus(NodeId nodeId, List<ContainerStatus> containers) {
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);

View File

@ -42,6 +42,8 @@
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
@ -156,12 +158,19 @@ public GetContainerStatusesResponse getContainerStatuses(
return GetContainerStatusesResponse.newInstance(null, null);
}
@Deprecated
@Override
public IncreaseContainersResourceResponse increaseContainersResource(IncreaseContainersResourceRequest request)
throws YarnException {
return IncreaseContainersResourceResponse.newInstance(null, null);
}
@Override
public ContainerUpdateResponse updateContainer(ContainerUpdateRequest
request) throws YarnException, IOException {
return ContainerUpdateResponse.newInstance(null, null);
}
public Credentials getContainerCredentials() throws IOException {
Credentials credentials = new Credentials();
DataInputByteBuffer buf = new DataInputByteBuffer();

View File

@ -34,6 +34,8 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
@ -148,6 +150,7 @@ public GetContainerStatusesResponse getContainerStatuses(
}
@Override
@Deprecated
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request)
throws YarnException {
@ -190,6 +193,12 @@ public CommitResponse commitLastReInitialization(ContainerId containerId)
throws YarnException, IOException {
return null;
}
@Override
public ContainerUpdateResponse updateContainer(ContainerUpdateRequest
request) throws YarnException, IOException {
return null;
}
}
@Test