Merge r1479733 through r1480439 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1480440 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-05-08 20:20:56 +00:00
commit 37d6f14560
54 changed files with 1886 additions and 133 deletions

View File

@ -547,7 +547,10 @@ Trunk (Unreleased)
HADOOP-9483. winutils support for readlink command.
(Arpit Agarwal via suresh)
HADOOP-9540. Expose the InMemoryS3 and S3N FilesystemStores implementations for Unit testing.
(Hari via stevel)
Release 2.0.5-beta - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -41,7 +41,7 @@ import org.apache.hadoop.fs.s3.INode.FileType;
* A stub implementation of {@link FileSystemStore} for testing
* {@link S3FileSystem} without actually connecting to S3.
*/
class InMemoryFileSystemStore implements FileSystemStore {
public class InMemoryFileSystemStore implements FileSystemStore {
private Configuration conf;
private SortedMap<Path, INode> inodes = new TreeMap<Path, INode>();

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import org.apache.hadoop.fs.s3.S3FileSystem;
import org.apache.hadoop.fs.s3.InMemoryFileSystemStore;
/**
* A helper implementation of {@link S3FileSystem}
* without actually connecting to S3 for unit testing.
*/
public class S3InMemoryFileSystem extends S3FileSystem {
public S3InMemoryFileSystem() {
super(new InMemoryFileSystemStore());
}
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
public class TestS3InMemoryFileSystem extends TestCase {
private static final String TEST_PATH = "s3://test/data.txt";
private static final String TEST_DATA = "Sample data for testing.";
private S3InMemoryFileSystem fs;
@Override
public void setUp() throws IOException {
fs = new S3InMemoryFileSystem();
fs.initialize(URI.create("s3://test/"), new Configuration());
}
public void testBasicReadWriteIO() throws IOException {
FSDataOutputStream writeStream = fs.create(new Path(TEST_PATH));
writeStream.write(TEST_DATA.getBytes());
writeStream.flush();
writeStream.close();
FSDataInputStream readStream = fs.open(new Path(TEST_PATH));
BufferedReader br = new BufferedReader(new InputStreamReader(readStream));
String line = "";
StringBuffer stringBuffer = new StringBuffer();
while ((line = br.readLine()) != null) {
stringBuffer.append(line);
}
br.close();
assert(TEST_DATA.equals(stringBuffer.toString()));
}
@Override
public void tearDown() throws IOException {
fs.close();
}
}

View File

@ -47,7 +47,7 @@ import org.apache.hadoop.util.Time;
* {@link NativeS3FileSystem} without actually connecting to S3.
* </p>
*/
class InMemoryNativeFileSystemStore implements NativeFileSystemStore {
public class InMemoryNativeFileSystemStore implements NativeFileSystemStore {
private Configuration conf;

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3native;
import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
import org.apache.hadoop.fs.s3native.InMemoryNativeFileSystemStore;
/**
* A helper implementation of {@link NativeS3FileSystem}
* without actually connecting to S3 for unit testing.
*/
public class S3NInMemoryFileSystem extends NativeS3FileSystem {
public S3NInMemoryFileSystem() {
super(new InMemoryNativeFileSystemStore());
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3native;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
public class TestS3NInMemoryFileSystem extends TestCase {
private static final String TEST_PATH = "s3n://test/data.txt";
private static final String TEST_DATA = "Sample data for testing.";
private S3NInMemoryFileSystem fs;
@Override
public void setUp() throws IOException {
fs = new S3NInMemoryFileSystem();
fs.initialize(URI.create("s3n://test/"), new Configuration());
}
public void testBasicReadWriteIO() throws IOException {
FSDataOutputStream writeData = fs.create(new Path(TEST_PATH));
writeData.write(TEST_DATA.getBytes());
writeData.flush();
writeData.close();
FSDataInputStream readData = fs.open(new Path(TEST_PATH));
BufferedReader br = new BufferedReader(new InputStreamReader(readData));
String line = "";
StringBuffer stringBuffer = new StringBuffer();
while ((line = br.readLine()) != null) {
stringBuffer.append(line);
}
br.close();
assert(TEST_DATA.equals(stringBuffer.toString()));
}
@Override
public void tearDown() throws IOException {
fs.close();
}
}

View File

@ -496,6 +496,9 @@ Release 2.0.5-beta - UNRELEASED
the datanode with the most recent heartbeat as the primary. (Varun Sharma
via szetszwo)
HDFS-4804. WARN when users set the block balanced preference percent below
0.5 or above 1.0. (Stephen Chu via atm)
OPTIMIZATIONS
BUG FIXES

View File

@ -64,6 +64,16 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
" = " + balancedSpaceThreshold + ", " +
DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_PREFERENCE_PERCENT_KEY +
" = " + balancedPreferencePercent);
if (balancedPreferencePercent > 1.0) {
LOG.warn("The value of " + DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_PREFERENCE_PERCENT_KEY +
" is greater than 1.0 but should be in the range 0.0 - 1.0");
}
if (balancedPreferencePercent < 0.5) {
LOG.warn("The value of " + DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_PREFERENCE_PERCENT_KEY +
" is less than 0.5 so volumes with less available disk space will receive more block allocations");
}
}
@Override

View File

@ -221,6 +221,9 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5036. Default shuffle handler port should not be 8080.
(Sandy Ryza via tomwhite)
MAPREDUCE-5159. Change ValueAggregatorJob to add APIs which can support
binary compatibility with hadoop-1 examples. (Zhijie Shen via vinodkv)
OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method

View File

@ -102,15 +102,17 @@ public class ValueAggregatorJob {
/**
* Create an Aggregate based map/reduce job.
*
*
* @param args the arguments used for job creation. Generic hadoop
* arguments are accepted.
* @param caller the the caller class.
* @return a JobConf object ready for submission.
*
*
* @throws IOException
* @see GenericOptionsParser
*/
public static JobConf createValueAggregatorJob(String args[])
@SuppressWarnings("rawtypes")
public static JobConf createValueAggregatorJob(String args[], Class<?> caller)
throws IOException {
Configuration conf = new Configuration();
@ -159,7 +161,7 @@ public class ValueAggregatorJob {
}
String userJarFile = theJob.get("user.jar.file");
if (userJarFile == null) {
theJob.setJarByClass(ValueAggregator.class);
theJob.setJarByClass(caller != null ? caller : ValueAggregatorJob.class);
} else {
theJob.setJar(userJarFile);
}
@ -183,6 +185,21 @@ public class ValueAggregatorJob {
return theJob;
}
/**
* Create an Aggregate based map/reduce job.
*
* @param args the arguments used for job creation. Generic hadoop
* arguments are accepted.
* @return a JobConf object ready for submission.
*
* @throws IOException
* @see GenericOptionsParser
*/
public static JobConf createValueAggregatorJob(String args[])
throws IOException {
return createValueAggregatorJob(args, ValueAggregator.class);
}
public static JobConf createValueAggregatorJob(String args[]
, Class<? extends ValueAggregatorDescriptor>[] descriptors)
throws IOException {
@ -199,7 +216,15 @@ public class ValueAggregatorJob {
job.set("aggregator.descriptor." + i, "UserDefined," + descriptors[i].getName());
}
}
public static JobConf createValueAggregatorJob(String args[],
Class<? extends ValueAggregatorDescriptor>[] descriptors,
Class<?> caller) throws IOException {
JobConf job = createValueAggregatorJob(args, caller);
setAggregatorDescriptors(job, descriptors);
return job;
}
/**
* create and run an Aggregate based map/reduce job.
*

View File

@ -10,6 +10,9 @@ Trunk - Unreleased
Azure environments. (See breakdown of tasks below for subtasks and
contributors)
YARN-45. Add protocol for schedulers to request containers back from
ApplicationMasters. (Carlo Curino, cdouglas)
IMPROVEMENTS
YARN-84. Use Builder to build RPC server. (Brandon Li via suresh)
@ -109,6 +112,12 @@ Release 2.0.5-beta - UNRELEASED
YARN-629. Make YarnRemoteException not be rooted at IOException. (Xuan Gong
via vinodkv)
YARN-633. Changed RMAdminProtocol api to throw IOException and
YarnRemoteException. (Xuan Gong via vinodkv)
YARN-632. Changed ContainerManager api to throw IOException and
YarnRemoteException. (Xuan Gong via vinodkv)
NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues.
@ -213,6 +222,13 @@ Release 2.0.5-beta - UNRELEASED
YARN-645. Moved RMDelegationTokenSecretManager from yarn-server-common to
yarn-server-resourcemanager where it really belongs. (Jian He via vinodkv)
YARN-651. Changed PBClientImpls of ContainerManager and RMAdmin to throw
IOExceptions also. (Xuan Gong via vinodkv)
YARN-582. Changed ResourceManager to recover Application token and client
tokens for app attempt so that RM can be restarted while preserving current
applications. (Jian He via vinodkv)
OPTIMIZATIONS
BUG FIXES
@ -335,6 +351,8 @@ Release 2.0.5-beta - UNRELEASED
YARN-576. Modified ResourceManager to reject NodeManagers that don't satisy
minimum resource requirements. (Kenji Kikushima via vinodkv)
YARN-646. Fix two typos in Fair Scheduler user guide. (Dapeng Sun via atm)
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.api;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
@ -68,11 +70,12 @@ public interface ContainerManager {
* @return empty response to indicate acceptance of the request
* or an exception
* @throws YarnRemoteException
* @throws IOException
*/
@Public
@Stable
StartContainerResponse startContainer(StartContainerRequest request)
throws YarnRemoteException;
throws YarnRemoteException, IOException;
/**
* <p>The <code>ApplicationMaster</code> requests a <code>NodeManager</code>
@ -94,11 +97,12 @@ public interface ContainerManager {
* @return empty response to indicate acceptance of the request
* or an exception
* @throws YarnRemoteException
* @throws IOException
*/
@Public
@Stable
StopContainerResponse stopContainer(StopContainerRequest request)
throws YarnRemoteException;
throws YarnRemoteException, IOException;
/**
* <p>The api used by the <code>ApplicationMaster</code> to request for
@ -118,9 +122,11 @@ public interface ContainerManager {
* @return response containing the <code>ContainerStatus</code> of the
* container
* @throws YarnRemoteException
* @throws IOException
*/
@Public
@Stable
GetContainerStatusResponse getContainerStatus(
GetContainerStatusRequest request) throws YarnRemoteException;
GetContainerStatusRequest request) throws YarnRemoteException,
IOException;
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.api;
import java.io.IOException;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.api.protocolrecords.RefreshAdminAclsRequest;
@ -35,25 +37,25 @@ import org.apache.hadoop.yarn.api.protocolrecords.RefreshUserToGroupsMappingsRes
public interface RMAdminProtocol extends GetUserMappingsProtocol {
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws YarnRemoteException;
throws YarnRemoteException, IOException;
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws YarnRemoteException;
throws YarnRemoteException, IOException;
public RefreshSuperUserGroupsConfigurationResponse
refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
throws YarnRemoteException;
throws YarnRemoteException, IOException;
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request)
throws YarnRemoteException;
throws YarnRemoteException, IOException;
public RefreshAdminAclsResponse refreshAdminAcls(
RefreshAdminAclsRequest request)
throws YarnRemoteException;
throws YarnRemoteException, IOException;
public RefreshServiceAclsResponse refreshServiceAcls(
RefreshServiceAclsRequest request)
throws YarnRemoteException;
throws YarnRemoteException, IOException;
}

View File

@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.AMRMProtocol;
@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
* </li>
* <li>A list of nodes whose status has been updated.</li>
* <li>The number of available nodes in a cluster.</li>
* <li>A description of resources requested back by the cluster</li>
* </ul>
* </p>
*
@ -152,4 +154,27 @@ public interface AllocateResponse {
@Private
@Unstable
public void setNumClusterNodes(int numNodes);
/**
* Get the description of containers owned by the AM, but requested back by
* the cluster. Note that the RM may have an inconsistent view of the
* resources owned by the AM. These messages are advisory, and the AM may
* elect to ignore them.
*
* The message is a snapshot of the resources the RM wants back from the AM.
* While demand persists, the RM will repeat its request; applications should
* not interpret each message as a request for <emph>additional<emph>
* resources on top of previous messages. Resources requested consistently
* over some duration may be forcibly killed by the RM.
*
* @return A specification of the resources to reclaim from this AM.
*/
@Public
@Evolving
public PreemptionMessage getPreemptionMessage();
@Private
@Unstable
public void setPreemptionMessage(PreemptionMessage request);
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerId;
/**
* Specific container requested back by the <code>ResourceManager</code>.
* @see PreemptionContract
* @see StrictPreemptionContract
*/
public interface PreemptionContainer {
/**
* @return Container referenced by this handle.
*/
@Public
@Evolving
public ContainerId getId();
@Private
@Unstable
public void setId(ContainerId id);
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Description of resources requested back by the <code>ResourceManager</code>.
* The <code>ApplicationMaster</code> (AM) can satisfy this request according
* to its own priorities to prevent containers from being forcibly killed by
* the platform.
* @see PreemptionMessage
*/
public interface PreemptionContract {
/**
* If the AM releases resources matching these requests, then the {@link
* PreemptionContainer}s enumerated in {@link #getContainers()} should not be
* evicted from the cluster. Due to delays in propagating cluster state and
* sending these messages, there are conditions where satisfied contracts may
* not prevent the platform from killing containers.
* @return List of {@link PreemptionResourceRequest} to update the
* <code>ApplicationMaster</code> about resources requested back by the
* <code>ResourceManager</code>.
* @see AllocateRequest#setAskList(List)
*/
@Public
@Evolving
public List<PreemptionResourceRequest> getResourceRequest();
@Private
@Unstable
public void setResourceRequest(List<PreemptionResourceRequest> req);
/**
* Assign the set of {@link PreemptionContainer} specifying which containers
* owned by the <code>ApplicationMaster</code> that may be reclaimed by the
* <code>ResourceManager</code>. If the AM prefers a different set of
* containers, then it may checkpoint or kill containers matching the
* description in {@link #getResourceRequest}.
* @return Set of containers at risk if the contract is not met.
*/
@Public
@Evolving
public Set<PreemptionContainer> getContainers();
@Private
@Unstable
public void setContainers(Set<PreemptionContainer> containers);
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* A {@link PreemptionMessage} is part of the RM-AM protocol, and it is used by
* the RM to specify resources that the RM wants to reclaim from this
* <code>ApplicationMaster</code> (AM). The AM receives a {@link
* StrictPreemptionContract} message encoding which containers the platform may
* forcibly kill, granting it an opportunity to checkpoint state or adjust its
* execution plan. The message may also include a {@link PreemptionContract}
* granting the AM more latitude in selecting which resources to return to the
* cluster.
*
* The AM should decode both parts of the message. The {@link
* StrictPreemptionContract} specifies particular allocations that the RM
* requires back. The AM can checkpoint containers' state, adjust its execution
* plan to move the computation, or take no action and hope that conditions that
* caused the RM to ask for the container will change.
*
* In contrast, the {@link PreemptionContract} also includes a description of
* resources with a set of containers. If the AM releases containers matching
* that profile, then the containers enumerated in {@link
* PreemptionContract#getContainers()} may not be killed.
*
* Each preemption message reflects the RM's current understanding of the
* cluster state, so a request to return <emph>N</emph> containers may not
* reflect containers the AM is releasing, recently exited containers the RM has
* yet to learn about, or new containers allocated before the message was
* generated. Conversely, an RM may request a different profile of containers in
* subsequent requests.
*
* The policy enforced by the RM is part of the scheduler. Generally, only
* containers that have been requested consistently should be killed, but the
* details are not specified.
*/
@Public
@Evolving
public interface PreemptionMessage {
/**
* @return Specific resources that may be killed by the
* <code>ResourceManager</code>
*/
@Public
@Evolving
public StrictPreemptionContract getStrictContract();
@Private
@Unstable
public void setStrictContract(StrictPreemptionContract set);
/**
* @return Contract describing resources to return to the cluster.
*/
@Public
@Evolving
public PreemptionContract getContract();
@Private
@Unstable
public void setContract(PreemptionContract contract);
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
/**
* Description of resources requested back by the cluster.
* @see PreemptionContract
* @see AllocateRequest#setAskList(java.util.List)
*/
public interface PreemptionResourceRequest {
/**
* @return Resource described in this request, to be matched against running
* containers.
*/
@Public
@Evolving
public ResourceRequest getResourceRequest();
@Private
@Unstable
public void setResourceRequest(ResourceRequest req);
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerId;
/**
* Enumeration of particular allocations to be reclaimed. The platform will
* reclaim exactly these resources, so the <code>ApplicationMaster</code> (AM)
* may attempt to checkpoint work or adjust its execution plan to accommodate
* it. In contrast to {@link PreemptionContract}, the AM has no flexibility in
* selecting which resources to return to the cluster.
* @see PreemptionMessage
*/
@Public
@Evolving
public interface StrictPreemptionContract {
/**
* Get the set of {@link PreemptionContainer} specifying containers owned by
* the <code>ApplicationMaster</code> that may be reclaimed by the
* <code>ResourceManager</code>.
* @return the set of {@link ContainerId} to be preempted.
*/
@Public
@Evolving
public Set<PreemptionContainer> getContainers();
@Private
@Unstable
public void setContainers(Set<PreemptionContainer> containers);
}

View File

@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
@ -39,7 +40,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionMessageProto;
public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
@ -54,6 +55,7 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
private List<ContainerStatus> completedContainersStatuses = null;
private List<NodeReport> updatedNodes = null;
private PreemptionMessage preempt;
public AllocateResponsePBImpl() {
@ -94,6 +96,9 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
if (this.limit != null) {
builder.setLimit(convertToProtoFormat(this.limit));
}
if (this.preempt != null) {
builder.setPreempt(convertToProtoFormat(this.preempt));
}
}
private synchronized void mergeLocalToProto() {
@ -217,6 +222,28 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
builder.setNumClusterNodes(numNodes);
}
@Override
public synchronized PreemptionMessage getPreemptionMessage() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.preempt != null) {
return this.preempt;
}
if (!p.hasPreempt()) {
return null;
}
this.preempt = convertFromProtoFormat(p.getPreempt());
return this.preempt;
}
@Override
public synchronized void setPreemptionMessage(PreemptionMessage preempt) {
maybeInitBuilder();
if (null == preempt) {
builder.clearPreempt();
}
this.preempt = preempt;
}
// Once this is called. updatedNodes will never be null - until a getProto is
// called.
private synchronized void initLocalNewNodeReportList() {
@ -393,4 +420,11 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
return ((ResourcePBImpl) r).getProto();
}
private synchronized PreemptionMessagePBImpl convertFromProtoFormat(PreemptionMessageProto p) {
return new PreemptionMessagePBImpl(p);
}
private synchronized PreemptionMessageProto convertToProtoFormat(PreemptionMessage r) {
return ((PreemptionMessagePBImpl)r).getProto();
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContainer;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContainerProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContainerProtoOrBuilder;
public class PreemptionContainerPBImpl implements PreemptionContainer {
PreemptionContainerProto proto =
PreemptionContainerProto.getDefaultInstance();
PreemptionContainerProto.Builder builder = null;
boolean viaProto = false;
private ContainerId id;
public PreemptionContainerPBImpl() {
builder = PreemptionContainerProto.newBuilder();
}
public PreemptionContainerPBImpl(PreemptionContainerProto proto) {
this.proto = proto;
viaProto = true;
}
public synchronized PreemptionContainerProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void mergeLocalToBuilder() {
if (id != null) {
builder.setId(convertToProtoFormat(id));
}
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = PreemptionContainerProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized ContainerId getId() {
PreemptionContainerProtoOrBuilder p = viaProto ? proto : builder;
if (id != null) {
return id;
}
if (!p.hasId()) {
return null;
}
id = convertFromProtoFormat(p.getId());
return id;
}
@Override
public synchronized void setId(final ContainerId id) {
maybeInitBuilder();
if (null == id) {
builder.clearId();
}
this.id = id;
}
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p);
}
private ContainerIdProto convertToProtoFormat(ContainerId t) {
return ((ContainerIdPBImpl)t).getProto();
}
}

View File

@ -0,0 +1,228 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContainer;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContract;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.PreemptionResourceRequestPBImpl;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContainerProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContractProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContractProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionResourceRequestProto;
public class PreemptionContractPBImpl implements PreemptionContract {
PreemptionContractProto proto = PreemptionContractProto.getDefaultInstance();
PreemptionContractProto.Builder builder = null;
boolean viaProto = false;
private Set<PreemptionContainer> containers;
private List<PreemptionResourceRequest> resources;
public PreemptionContractPBImpl() {
builder = PreemptionContractProto.newBuilder();
}
public PreemptionContractPBImpl(PreemptionContractProto proto) {
this.proto = proto;
viaProto = true;
}
public synchronized PreemptionContractProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void mergeLocalToBuilder() {
if (this.resources != null) {
addResourcesToProto();
}
if (this.containers != null) {
addContainersToProto();
}
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = PreemptionContractProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized Set<PreemptionContainer> getContainers() {
initPreemptionContainers();
return containers;
}
@Override
public synchronized void setContainers(
final Set<PreemptionContainer> containers) {
if (null == containers) {
builder.clearContainer();
}
this.containers = containers;
}
@Override
public synchronized List<PreemptionResourceRequest> getResourceRequest() {
initPreemptionResourceRequests();
return resources;
}
@Override
public synchronized void setResourceRequest(
final List<PreemptionResourceRequest> req) {
if (null == resources) {
builder.clearResource();
}
this.resources = req;
}
private void initPreemptionResourceRequests() {
if (resources != null) {
return;
}
PreemptionContractProtoOrBuilder p = viaProto ? proto : builder;
List<PreemptionResourceRequestProto> list = p.getResourceList();
resources = new ArrayList<PreemptionResourceRequest>();
for (PreemptionResourceRequestProto rr : list) {
resources.add(convertFromProtoFormat(rr));
}
}
private void addResourcesToProto() {
maybeInitBuilder();
builder.clearResource();
if (null == resources) {
return;
}
Iterable<PreemptionResourceRequestProto> iterable =
new Iterable<PreemptionResourceRequestProto>() {
@Override
public Iterator<PreemptionResourceRequestProto> iterator() {
return new Iterator<PreemptionResourceRequestProto>() {
Iterator<PreemptionResourceRequest> iter = resources.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public PreemptionResourceRequestProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllResource(iterable);
}
private void initPreemptionContainers() {
if (containers != null) {
return;
}
PreemptionContractProtoOrBuilder p = viaProto ? proto : builder;
List<PreemptionContainerProto> list = p.getContainerList();
containers = new HashSet<PreemptionContainer>();
for (PreemptionContainerProto c : list) {
containers.add(convertFromProtoFormat(c));
}
}
private void addContainersToProto() {
maybeInitBuilder();
builder.clearContainer();
if (null == containers) {
return;
}
Iterable<PreemptionContainerProto> iterable =
new Iterable<PreemptionContainerProto>() {
@Override
public Iterator<PreemptionContainerProto> iterator() {
return new Iterator<PreemptionContainerProto>() {
Iterator<PreemptionContainer> iter = containers.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public PreemptionContainerProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllContainer(iterable);
}
private PreemptionContainerPBImpl convertFromProtoFormat(PreemptionContainerProto p) {
return new PreemptionContainerPBImpl(p);
}
private PreemptionContainerProto convertToProtoFormat(PreemptionContainer t) {
return ((PreemptionContainerPBImpl)t).getProto();
}
private PreemptionResourceRequestPBImpl convertFromProtoFormat(PreemptionResourceRequestProto p) {
return new PreemptionResourceRequestPBImpl(p);
}
private PreemptionResourceRequestProto convertToProtoFormat(PreemptionResourceRequest t) {
return ((PreemptionResourceRequestPBImpl)t).getProto();
}
}

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContract;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage;
import org.apache.hadoop.yarn.api.protocolrecords.StrictPreemptionContract;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContractProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionMessageProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionMessageProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StrictPreemptionContractProto;
public class PreemptionMessagePBImpl implements PreemptionMessage {
PreemptionMessageProto proto = PreemptionMessageProto.getDefaultInstance();
PreemptionMessageProto.Builder builder = null;
boolean viaProto = false;
private StrictPreemptionContract strict;
private PreemptionContract contract;
public PreemptionMessagePBImpl() {
builder = PreemptionMessageProto.newBuilder();
}
public PreemptionMessagePBImpl(PreemptionMessageProto proto) {
this.proto = proto;
viaProto = true;
}
public synchronized PreemptionMessageProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void mergeLocalToBuilder() {
if (strict != null) {
builder.setStrictContract(convertToProtoFormat(strict));
}
if (contract != null) {
builder.setContract(convertToProtoFormat(contract));
}
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = PreemptionMessageProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized StrictPreemptionContract getStrictContract() {
PreemptionMessageProtoOrBuilder p = viaProto ? proto : builder;
if (strict != null) {
return strict;
}
if (!p.hasStrictContract()) {
return null;
}
strict = convertFromProtoFormat(p.getStrictContract());
return strict;
}
@Override
public synchronized void setStrictContract(StrictPreemptionContract strict) {
maybeInitBuilder();
if (null == strict) {
builder.clearStrictContract();
}
this.strict = strict;
}
@Override
public synchronized PreemptionContract getContract() {
PreemptionMessageProtoOrBuilder p = viaProto ? proto : builder;
if (contract != null) {
return contract;
}
if (!p.hasContract()) {
return null;
}
contract = convertFromProtoFormat(p.getContract());
return contract;
}
@Override
public synchronized void setContract(final PreemptionContract c) {
maybeInitBuilder();
if (null == c) {
builder.clearContract();
}
this.contract = c;
}
private StrictPreemptionContractPBImpl convertFromProtoFormat(
StrictPreemptionContractProto p) {
return new StrictPreemptionContractPBImpl(p);
}
private StrictPreemptionContractProto convertToProtoFormat(
StrictPreemptionContract t) {
return ((StrictPreemptionContractPBImpl)t).getProto();
}
private PreemptionContractPBImpl convertFromProtoFormat(
PreemptionContractProto p) {
return new PreemptionContractPBImpl(p);
}
private PreemptionContractProto convertToProtoFormat(
PreemptionContract t) {
return ((PreemptionContractPBImpl)t).getProto();
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionResourceRequestProtoOrBuilder;
public class PreemptionResourceRequestPBImpl implements PreemptionResourceRequest {
PreemptionResourceRequestProto proto =
PreemptionResourceRequestProto.getDefaultInstance();
PreemptionResourceRequestProto.Builder builder = null;
boolean viaProto = false;
private ResourceRequest rr;
public PreemptionResourceRequestPBImpl() {
builder = PreemptionResourceRequestProto.newBuilder();
}
public PreemptionResourceRequestPBImpl(PreemptionResourceRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public synchronized PreemptionResourceRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void mergeLocalToBuilder() {
if (rr != null) {
builder.setResource(convertToProtoFormat(rr));
}
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = PreemptionResourceRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized ResourceRequest getResourceRequest() {
PreemptionResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
if (rr != null) {
return rr;
}
if (!p.hasResource()) {
return null;
}
rr = convertFromProtoFormat(p.getResource());
return rr;
}
@Override
public synchronized void setResourceRequest(final ResourceRequest rr) {
maybeInitBuilder();
if (null == rr) {
builder.clearResource();
}
this.rr = rr;
}
private ResourceRequestPBImpl convertFromProtoFormat(ResourceRequestProto p) {
return new ResourceRequestPBImpl(p);
}
private ResourceRequestProto convertToProtoFormat(ResourceRequest t) {
return ((ResourceRequestPBImpl)t).getProto();
}
}

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContainer;
import org.apache.hadoop.yarn.api.protocolrecords.StrictPreemptionContract;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContainerProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StrictPreemptionContractProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StrictPreemptionContractProtoOrBuilder;
public class StrictPreemptionContractPBImpl implements StrictPreemptionContract {
StrictPreemptionContractProto proto =
StrictPreemptionContractProto.getDefaultInstance();
StrictPreemptionContractProto.Builder builder = null;
boolean viaProto = false;
private Set<PreemptionContainer> containers;
public StrictPreemptionContractPBImpl() {
builder = StrictPreemptionContractProto.newBuilder();
}
public StrictPreemptionContractPBImpl(StrictPreemptionContractProto proto) {
this.proto = proto;
viaProto = true;
}
public synchronized StrictPreemptionContractProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void mergeLocalToBuilder() {
if (this.containers != null) {
addContainersToProto();
}
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = StrictPreemptionContractProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized Set<PreemptionContainer> getContainers() {
initIds();
return containers;
}
@Override
public synchronized void setContainers(
final Set<PreemptionContainer> containers) {
if (null == containers) {
builder.clearContainer();
}
this.containers = containers;
}
private void initIds() {
if (containers != null) {
return;
}
StrictPreemptionContractProtoOrBuilder p = viaProto ? proto : builder;
List<PreemptionContainerProto> list = p.getContainerList();
containers = new HashSet<PreemptionContainer>();
for (PreemptionContainerProto c : list) {
containers.add(convertFromProtoFormat(c));
}
}
private void addContainersToProto() {
maybeInitBuilder();
builder.clearContainer();
if (containers == null) {
return;
}
Iterable<PreemptionContainerProto> iterable = new Iterable<PreemptionContainerProto>() {
@Override
public Iterator<PreemptionContainerProto> iterator() {
return new Iterator<PreemptionContainerProto>() {
Iterator<PreemptionContainer> iter = containers.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public PreemptionContainerProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllContainer(iterable);
}
private PreemptionContainerPBImpl convertFromProtoFormat(PreemptionContainerProto p) {
return new PreemptionContainerPBImpl(p);
}
private PreemptionContainerProto convertToProtoFormat(PreemptionContainer t) {
return ((PreemptionContainerPBImpl)t).getProto();
}
}

View File

@ -72,4 +72,5 @@ message ApplicationStateDataProto {
message ApplicationAttemptStateDataProto {
optional ApplicationAttemptIdProto attemptId = 1;
optional ContainerProto master_container = 2;
optional bytes app_attempt_tokens = 3;
}

View File

@ -66,9 +66,30 @@ message AllocateResponseProto {
optional ResourceProto limit = 5;
repeated NodeReportProto updated_nodes = 6;
optional int32 num_cluster_nodes = 7;
optional PreemptionMessageProto preempt = 8;
}
message PreemptionMessageProto {
optional StrictPreemptionContractProto strictContract = 1;
optional PreemptionContractProto contract = 2;
}
message StrictPreemptionContractProto {
repeated PreemptionContainerProto container = 1;
}
message PreemptionContractProto {
repeated PreemptionResourceRequestProto resource = 1;
repeated PreemptionContainerProto container = 2;
}
message PreemptionContainerProto {
optional ContainerIdProto id = 1;
}
message PreemptionResourceRequestProto {
optional ResourceRequestProto resource = 1;
}
//////////////////////////////////////////////////////
/////// client_RM_Protocol ///////////////////////////

View File

@ -759,6 +759,10 @@ public class ApplicationMaster {
+ container.getId());
e.printStackTrace();
// TODO do we need to release this container?
} catch (IOException e) {
LOG.info("Start container failed for :" + ", containerId="
+ container.getId());
e.printStackTrace();
}
// Get container status?

View File

@ -113,7 +113,7 @@ public class TestAMRMClientAsync {
private AllocateResponse createAllocateResponse(
List<ContainerStatus> completed, List<Container> allocated) {
AllocateResponse response = BuilderUtils.newAllocateResponse(0, completed, allocated,
new ArrayList<NodeReport>(), null, false, 1);
new ArrayList<NodeReport>(), null, false, 1, null);
return response;
}

View File

@ -86,7 +86,8 @@ public class ContainerManagerPBClientImpl implements ContainerManager,
@Override
public GetContainerStatusResponse getContainerStatus(
GetContainerStatusRequest request) throws YarnRemoteException {
GetContainerStatusRequest request) throws YarnRemoteException,
IOException {
GetContainerStatusRequestProto requestProto =
((GetContainerStatusRequestPBImpl) request).getProto();
try {
@ -99,7 +100,7 @@ public class ContainerManagerPBClientImpl implements ContainerManager,
@Override
public StartContainerResponse startContainer(StartContainerRequest request)
throws YarnRemoteException {
throws YarnRemoteException, IOException {
StartContainerRequestProto requestProto =
((StartContainerRequestPBImpl) request).getProto();
try {
@ -112,7 +113,7 @@ public class ContainerManagerPBClientImpl implements ContainerManager,
@Override
public StopContainerResponse stopContainer(StopContainerRequest request)
throws YarnRemoteException {
throws YarnRemoteException, IOException {
StopContainerRequestProto requestProto =
((StopContainerRequestPBImpl) request).getProto();
try {

View File

@ -87,7 +87,7 @@ public class RMAdminProtocolPBClientImpl implements RMAdminProtocol, Closeable {
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws YarnRemoteException {
throws YarnRemoteException, IOException {
RefreshQueuesRequestProto requestProto =
((RefreshQueuesRequestPBImpl)request).getProto();
try {
@ -100,7 +100,7 @@ public class RMAdminProtocolPBClientImpl implements RMAdminProtocol, Closeable {
@Override
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws YarnRemoteException {
throws YarnRemoteException, IOException {
RefreshNodesRequestProto requestProto =
((RefreshNodesRequestPBImpl)request).getProto();
try {
@ -114,7 +114,7 @@ public class RMAdminProtocolPBClientImpl implements RMAdminProtocol, Closeable {
@Override
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
throws YarnRemoteException {
throws YarnRemoteException, IOException {
RefreshSuperUserGroupsConfigurationRequestProto requestProto =
((RefreshSuperUserGroupsConfigurationRequestPBImpl)request).getProto();
try {
@ -127,7 +127,8 @@ public class RMAdminProtocolPBClientImpl implements RMAdminProtocol, Closeable {
@Override
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request) throws YarnRemoteException {
RefreshUserToGroupsMappingsRequest request) throws YarnRemoteException,
IOException {
RefreshUserToGroupsMappingsRequestProto requestProto =
((RefreshUserToGroupsMappingsRequestPBImpl)request).getProto();
try {
@ -140,7 +141,7 @@ public class RMAdminProtocolPBClientImpl implements RMAdminProtocol, Closeable {
@Override
public RefreshAdminAclsResponse refreshAdminAcls(
RefreshAdminAclsRequest request) throws YarnRemoteException {
RefreshAdminAclsRequest request) throws YarnRemoteException, IOException {
RefreshAdminAclsRequestProto requestProto =
((RefreshAdminAclsRequestPBImpl)request).getProto();
try {
@ -153,7 +154,8 @@ public class RMAdminProtocolPBClientImpl implements RMAdminProtocol, Closeable {
@Override
public RefreshServiceAclsResponse refreshServiceAcls(
RefreshServiceAclsRequest request) throws YarnRemoteException {
RefreshServiceAclsRequest request) throws YarnRemoteException,
IOException {
RefreshServiceAclsRequestProto requestProto =
((RefreshServiceAclsRequestPBImpl)request).getProto();
try {

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.api.impl.pb.service;
import java.io.IOException;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.ContainerManagerPB;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
@ -57,6 +59,8 @@ public class ContainerManagerPBServiceImpl implements ContainerManagerPB {
return ((GetContainerStatusResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -69,6 +73,8 @@ public class ContainerManagerPBServiceImpl implements ContainerManagerPB {
return ((StartContainerResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -81,6 +87,8 @@ public class ContainerManagerPBServiceImpl implements ContainerManagerPB {
return ((StopContainerResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}

View File

@ -63,6 +63,8 @@ public class RMAdminProtocolPBServiceImpl implements RMAdminProtocolPB {
return ((RefreshQueuesResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -77,6 +79,8 @@ public class RMAdminProtocolPBServiceImpl implements RMAdminProtocolPB {
return ((RefreshAdminAclsResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -89,6 +93,8 @@ public class RMAdminProtocolPBServiceImpl implements RMAdminProtocolPB {
return ((RefreshNodesResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -106,6 +112,8 @@ public class RMAdminProtocolPBServiceImpl implements RMAdminProtocolPB {
return ((RefreshSuperUserGroupsConfigurationResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -121,6 +129,8 @@ public class RMAdminProtocolPBServiceImpl implements RMAdminProtocolPB {
return ((RefreshUserToGroupsMappingsResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -136,6 +146,8 @@ public class RMAdminProtocolPBServiceImpl implements RMAdminProtocolPB {
return ((RefreshServiceAclsResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -404,7 +405,8 @@ public class BuilderUtils {
public static AllocateResponse newAllocateResponse(int responseId,
List<ContainerStatus> completedContainers,
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, boolean reboot, int numClusterNodes) {
Resource availResources, boolean reboot, int numClusterNodes,
PreemptionMessage preempt) {
AllocateResponse response = recordFactory
.newRecordInstance(AllocateResponse.class);
response.setNumClusterNodes(numClusterNodes);
@ -414,6 +416,7 @@ public class BuilderUtils {
response.setUpdatedNodes(updatedNodes);
response.setAvailableResources(availResources);
response.setReboot(reboot);
response.setPreemptionMessage(preempt);
return response;
}

View File

@ -396,7 +396,7 @@ public class ContainerManagerImpl extends CompositeService implements
@SuppressWarnings("unchecked")
@Override
public StartContainerResponse startContainer(StartContainerRequest request)
throws YarnRemoteException {
throws YarnRemoteException, IOException {
if (blockNewContainerRequests.get()) {
throw RPCUtil.getRemoteException(new NMNotYetReadyException(
@ -503,7 +503,7 @@ public class ContainerManagerImpl extends CompositeService implements
@Override
@SuppressWarnings("unchecked")
public StopContainerResponse stopContainer(StopContainerRequest request)
throws YarnRemoteException {
throws YarnRemoteException, IOException {
ContainerId containerID = request.getContainerId();
String containerIDStr = containerID.toString();
@ -545,7 +545,8 @@ public class ContainerManagerImpl extends CompositeService implements
@Override
public GetContainerStatusResponse getContainerStatus(
GetContainerStatusRequest request) throws YarnRemoteException {
GetContainerStatusRequest request) throws YarnRemoteException,
IOException {
ContainerId containerID = request.getContainerId();
String containerIDStr = containerID.toString();

View File

@ -301,6 +301,8 @@ public class TestNodeManagerResync {
// class name after YARN-142
Assert.assertTrue(e.getRemoteTrace().contains(
NMNotYetReadyException.class.getName()));
} catch (IOException e) {
assertionFailedInThread.set(true);
}
}
// no. of containers to be launched should equal to no. of

View File

@ -185,13 +185,13 @@ public abstract class BaseContainerManagerTest {
public static void waitForContainerState(ContainerManager containerManager,
ContainerId containerID, ContainerState finalState)
throws InterruptedException, YarnRemoteException {
throws InterruptedException, YarnRemoteException, IOException {
waitForContainerState(containerManager, containerID, finalState, 20);
}
public static void waitForContainerState(ContainerManager containerManager,
ContainerId containerID, ContainerState finalState, int timeOutMax)
throws InterruptedException, YarnRemoteException {
throws InterruptedException, YarnRemoteException, IOException {
GetContainerStatusRequest request =
recordFactory.newRecordInstance(GetContainerStatusRequest.class);
request.setContainerId(containerID);

View File

@ -334,10 +334,6 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
LOG.info("Recovering " + appStates.size() + " applications");
for(ApplicationState appState : appStates.values()) {
boolean shouldRecover = true;
// re-submit the application
// this is going to send an app start event but since the async dispatcher
// has not started that event will be queued until we have completed re
// populating the state
if(appState.getApplicationSubmissionContext().getUnmanagedAM()) {
// do not recover unmanaged applications since current recovery
// mechanism of restarting attempts does not work for them.
@ -367,6 +363,10 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
shouldRecover = false;
}
// re-submit the application
// this is going to send an app start event but since the async dispatcher
// has not started that event will be queued until we have completed re
// populating the state
if(shouldRecover) {
LOG.info("Recovering application " + appState.getAppId());
submitApplication(appState.getApplicationSubmissionContext(),

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
@ -48,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -203,25 +201,16 @@ public class AMLauncher implements Runnable {
credentials.readTokenStorageStream(dibb);
}
ApplicationTokenIdentifier id = new ApplicationTokenIdentifier(
application.getAppAttemptId());
Token<ApplicationTokenIdentifier> appMasterToken =
new Token<ApplicationTokenIdentifier>(id,
this.rmContext.getApplicationTokenSecretManager());
InetSocketAddress serviceAddr = conf.getSocketAddr(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
// normally the client should set the service after acquiring the token,
// but this token is directly provided to the AMs
SecurityUtil.setTokenService(appMasterToken, serviceAddr);
// Add the ApplicationMaster token
credentials.addToken(appMasterToken.getService(), appMasterToken);
// Add application token
Token<ApplicationTokenIdentifier> applicationToken =
application.getApplicationToken();
if(applicationToken != null) {
credentials.addToken(applicationToken.getService(), applicationToken);
}
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
container.setContainerTokens(
ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
container.setContainerTokens(ByteBuffer.wrap(dob.getData(), 0,
dob.getLength()));
SecretKey clientSecretKey =
this.rmContext.getClientToAMTokenSecretManager().getMasterKey(

View File

@ -31,13 +31,15 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
@ -114,8 +116,17 @@ public class FileSystemRMStateStore extends RMStateStore {
ApplicationAttemptStateDataPBImpl attemptStateData =
new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(childData));
ApplicationAttemptState attemptState = new ApplicationAttemptState(
attemptId, attemptStateData.getMasterContainer());
Credentials credentials = null;
if(attemptStateData.getAppAttemptTokens() != null){
credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
dibb.reset(attemptStateData.getAppAttemptTokens());
credentials.readTokenStorageStream(dibb);
}
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials);
// assert child node name is same as application attempt id
assert attemptId.equals(attemptState.getAttemptId());
attempts.add(attemptState);

View File

@ -23,10 +23,12 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
@ -79,8 +81,16 @@ public class MemoryRMStateStore extends RMStateStore {
throws Exception {
ApplicationAttemptId attemptId = ConverterUtils
.toApplicationAttemptId(attemptIdStr);
ApplicationAttemptState attemptState = new ApplicationAttemptState(
attemptId, attemptStateData.getMasterContainer());
Credentials credentials = null;
if(attemptStateData.getAppAttemptTokens() != null){
DataInputByteBuffer dibb = new DataInputByteBuffer();
credentials = new Credentials();
dibb.reset(attemptStateData.getAppAttemptTokens());
credentials.readTokenStorageStream(dibb);
}
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials);
ApplicationState appState = state.getApplicationState().get(
attemptState.getAttemptId().getApplicationId());

View File

@ -20,8 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@Unstable
public class NullRMStateStore extends RMStateStore {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
@ -26,6 +27,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -34,8 +38,10 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPB
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -58,19 +64,25 @@ public abstract class RMStateStore {
public static class ApplicationAttemptState {
final ApplicationAttemptId attemptId;
final Container masterContainer;
final Credentials appAttemptTokens;
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer) {
Container masterContainer,
Credentials appAttemptTokens) {
this.attemptId = attemptId;
this.masterContainer = masterContainer;
this.appAttemptTokens = appAttemptTokens;
}
public Container getMasterContainer() {
return masterContainer;
}
public ApplicationAttemptId getAttemptId() {
return attemptId;
}
public Credentials getAppAttemptTokens() {
return appAttemptTokens;
}
}
/**
@ -199,10 +211,14 @@ public abstract class RMStateStore {
* RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt
*/
public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) {
ApplicationAttemptState attemptState = new ApplicationAttemptState(
appAttempt.getAppAttemptId(), appAttempt.getMasterContainer());
Credentials credentials = getTokensFromAppAttempt(appAttempt);
ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttempt.getAppAttemptId(),
appAttempt.getMasterContainer(), credentials);
dispatcher.getEventHandler().handle(
new RMStateStoreAppAttemptEvent(attemptState));
new RMStateStoreAppAttemptEvent(attemptState));
}
/**
@ -226,8 +242,10 @@ public abstract class RMStateStore {
ApplicationState appState = new ApplicationState(
app.getSubmitTime(), app.getApplicationSubmissionContext());
for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
ApplicationAttemptState attemptState = new ApplicationAttemptState(
appAttempt.getAppAttemptId(), appAttempt.getMasterContainer());
Credentials credentials = getTokensFromAppAttempt(appAttempt);
ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttempt.getAppAttemptId(),
appAttempt.getMasterContainer(), credentials);
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
@ -249,7 +267,20 @@ public abstract class RMStateStore {
*/
protected abstract void removeApplicationState(ApplicationState appState)
throws Exception;
private Credentials getTokensFromAppAttempt(RMAppAttempt appAttempt) {
Credentials credentials = new Credentials();
Token<ApplicationTokenIdentifier> appToken = appAttempt.getApplicationToken();
if(appToken != null){
credentials.addToken(appToken.getService(), appToken);
}
Token<ClientTokenIdentifier> clientToken = appAttempt.getClientToken();
if(clientToken != null){
credentials.addToken(clientToken.getService(), clientToken);
}
return credentials;
}
// Dispatcher related code
private synchronized void handleStoreEvent(RMStateStoreEvent event) {
@ -283,13 +314,22 @@ public abstract class RMStateStore {
ApplicationAttemptState attemptState =
((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
Exception storedException = null;
ApplicationAttemptStateDataPBImpl attemptStateData =
new ApplicationAttemptStateDataPBImpl();
attemptStateData.setAttemptId(attemptState.getAttemptId());
attemptStateData.setMasterContainer(attemptState.getMasterContainer());
LOG.info("Storing info for attempt: " + attemptState.getAttemptId());
Credentials credentials = attemptState.getAppAttemptTokens();
ByteBuffer appAttemptTokens = null;
try {
if(credentials != null){
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
appAttemptTokens =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}
ApplicationAttemptStateDataPBImpl attemptStateData =
(ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
.newApplicationAttemptStateData(attemptState.getAttemptId(),
attemptState.getMasterContainer(), appAttemptTokens);
LOG.info("Storing info for attempt: " + attemptState.getAttemptId());
storeApplicationAttemptState(attemptState.getAttemptId().toString(),
attemptStateData);
} catch (Exception e) {
@ -358,7 +398,5 @@ public abstract class RMStateStore {
public void handle(RMStateStoreEvent event) {
handleStoreEvent(event);
}
}
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -49,4 +51,14 @@ public interface ApplicationAttemptStateData {
public Container getMasterContainer();
public void setMasterContainer(Container container);
/**
* The application attempt tokens that belong to this attempt
* @return The application attempt tokens that belong to this attempt
*/
@Public
@Unstable
public ByteBuffer getAppAttemptTokens();
public void setAppAttemptTokens(ByteBuffer attemptTokens);
}

View File

@ -16,20 +16,27 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
import java.nio.ByteBuffer;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
public class ApplicationAttemptStateDataPBImpl
extends ProtoBase<ApplicationAttemptStateDataProto>
implements ApplicationAttemptStateData {
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
ApplicationAttemptStateDataProto proto =
ApplicationAttemptStateDataProto.getDefaultInstance();
ApplicationAttemptStateDataProto.Builder builder = null;
@ -37,7 +44,8 @@ implements ApplicationAttemptStateData {
private ApplicationAttemptId attemptId = null;
private Container masterContainer = null;
private ByteBuffer appAttemptTokens = null;
public ApplicationAttemptStateDataPBImpl() {
builder = ApplicationAttemptStateDataProto.newBuilder();
}
@ -62,6 +70,9 @@ implements ApplicationAttemptStateData {
if(this.masterContainer != null) {
builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto());
}
if(this.appAttemptTokens != null) {
builder.setAppAttemptTokens(convertToProtoFormat(this.appAttemptTokens));
}
}
private void mergeLocalToProto() {
@ -123,4 +134,36 @@ implements ApplicationAttemptStateData {
this.masterContainer = container;
}
@Override
public ByteBuffer getAppAttemptTokens() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
if(appAttemptTokens != null) {
return appAttemptTokens;
}
if(!p.hasAppAttemptTokens()) {
return null;
}
this.appAttemptTokens = convertFromProtoFormat(p.getAppAttemptTokens());
return appAttemptTokens;
}
@Override
public void setAppAttemptTokens(ByteBuffer attemptTokens) {
maybeInitBuilder();
if(attemptTokens == null) {
builder.clearAppAttemptTokens();
}
this.appAttemptTokens = attemptTokens;
}
public static ApplicationAttemptStateData newApplicationAttemptStateData(
ApplicationAttemptId attemptId, Container container,
ByteBuffer attemptTokens) {
ApplicationAttemptStateData attemptStateData =
recordFactory.newRecordInstance(ApplicationAttemptStateData.class);
attemptStateData.setAttemptId(attemptId);
attemptStateData.setMasterContainer(container);
attemptStateData.setAppAttemptTokens(attemptTokens);
return attemptStateData;
}
}

View File

@ -16,13 +16,14 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
public class ApplicationStateDataPBImpl
extends ProtoBase<ApplicationStateDataProto>

View File

@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
@ -443,7 +445,14 @@ public class RMAppImpl implements RMApp, Recoverable {
currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
trackingUrl = this.currentAttempt.getTrackingUrl();
origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
clientToken = this.currentAttempt.getClientToken();
Token<ClientTokenIdentifier> attemptClientToken =
this.currentAttempt.getClientToken();
if (attemptClientToken != null) {
clientToken =
BuilderUtils.newClientToken(attemptClientToken.getIdentifier(),
attemptClientToken.getKind().toString(), attemptClientToken
.getPassword(), attemptClientToken.getService().toString());
}
host = this.currentAttempt.getHost();
rpcPort = this.currentAttempt.getRpcPort();
appUsageReport = currentAttempt.getApplicationResourceUsageReport();

View File

@ -21,16 +21,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ClientToken;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
/**
@ -92,7 +94,7 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
* The token required by the clients to talk to the application attempt
* @return the token required by the clients to talk to the application attempt
*/
ClientToken getClientToken();
Token<ClientTokenIdentifier> getClientToken();
/**
* Diagnostics information for the application attempt.
@ -146,6 +148,12 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
*/
ApplicationSubmissionContext getSubmissionContext();
/**
* The application token belonging to this app attempt
* @return The application token belonging to this app attempt
*/
Token<ApplicationTokenIdentifier> getApplicationToken();
/**
* Get application container and resource usage information.
* @return an ApplicationResourceUsageReport object.

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@ -38,6 +39,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
@ -45,7 +49,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ClientToken;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -58,7 +61,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.ApplicationTokenSelector;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientTokenSelector;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
@ -123,8 +129,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private final WriteLock writeLock;
private final ApplicationAttemptId applicationAttemptId;
private ClientToken clientToken;
private Token<ClientTokenIdentifier> clientToken;
private final ApplicationSubmissionContext submissionContext;
private Token<ApplicationTokenIdentifier> applicationToken = null;
//nodes on while this attempt's containers ran
private final Set<NodeId> ranNodes =
@ -366,19 +373,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.scheduler = scheduler;
this.masterService = masterService;
if (UserGroupInformation.isSecurityEnabled()) {
this.rmContext.getClientToAMTokenSecretManager().registerApplication(
appAttemptId);
Token<ClientTokenIdentifier> token =
new Token<ClientTokenIdentifier>(new ClientTokenIdentifier(
appAttemptId), this.rmContext.getClientToAMTokenSecretManager());
this.clientToken =
BuilderUtils.newClientToken(token.getIdentifier(), token.getKind()
.toString(), token.getPassword(), token.getService().toString());
}
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
@ -502,10 +496,15 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
}
@Override
public ClientToken getClientToken() {
public Token<ClientTokenIdentifier> getClientToken() {
return this.clientToken;
}
@Override
public Token<ApplicationTokenIdentifier> getApplicationToken() {
return this.applicationToken;
}
@Override
public String getDiagnostics() {
this.readLock.lock();
@ -657,14 +656,42 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
assert attemptState != null;
setMasterContainer(attemptState.getMasterContainer());
LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId()
recoverAppAttemptTokens(attemptState.getAppAttemptTokens());
LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId()
+ " AttemptId: " + getAppAttemptId()
+ " MasterContainer: " + masterContainer);
setDiagnostics("Attempt recovered after RM restart");
handle(new RMAppAttemptEvent(getAppAttemptId(),
RMAppAttemptEventType.RECOVER));
}
private void recoverAppAttemptTokens(Credentials appAttemptTokens) {
if (appAttemptTokens == null) {
return;
}
if (UserGroupInformation.isSecurityEnabled()) {
ClientTokenSelector clientTokenSelector = new ClientTokenSelector();
this.clientToken =
clientTokenSelector.selectToken(new Text(),
appAttemptTokens.getAllTokens());
InetSocketAddress serviceAddr = conf.getSocketAddr(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
ApplicationTokenSelector appTokenSelector = new ApplicationTokenSelector();
this.applicationToken =
appTokenSelector.selectToken(
SecurityUtil.buildTokenService(serviceAddr),
appAttemptTokens.getAllTokens());
// For now, no need to populate tokens back to
// ApplicationTokenSecretManager, because running attempts are rebooted
// Later in work-preserve restart, we'll create NEW->RUNNING transition
// in which the restored tokens will be added to the secret manager
}
}
private static class BaseTransition implements
SingleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent> {
@ -686,6 +713,36 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.masterService
.registerAppAttempt(appAttempt.applicationAttemptId);
if (UserGroupInformation.isSecurityEnabled()) {
appAttempt.rmContext.getClientToAMTokenSecretManager()
.registerApplication(appAttempt.applicationAttemptId);
// create clientToken
appAttempt.clientToken =
new Token<ClientTokenIdentifier>(new ClientTokenIdentifier(
appAttempt.applicationAttemptId),
appAttempt.rmContext.getClientToAMTokenSecretManager());
// create application token
ApplicationTokenIdentifier id =
new ApplicationTokenIdentifier(appAttempt.applicationAttemptId);
Token<ApplicationTokenIdentifier> applicationToken =
new Token<ApplicationTokenIdentifier>(id,
appAttempt.rmContext.getApplicationTokenSecretManager());
InetSocketAddress serviceAddr =
appAttempt.conf.getSocketAddr(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
// normally the client should set the service after acquiring the
// token, but this token is directly provided to the AMs
SecurityUtil.setTokenService(applicationToken, serviceAddr);
appAttempt.applicationToken = applicationToken;
}
// Add the application to the scheduler
appAttempt.eventHandler.handle(
new AppAddedSchedulerEvent(appAttempt.applicationAttemptId,
@ -992,7 +1049,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.rmContext.getAMFinishingMonitor().unregister(
appAttempt.getAppAttemptId());
// Unregister from the ClientTokenSecretManager
if (UserGroupInformation.isSecurityEnabled()) {
appAttempt.rmContext.getClientToAMTokenSecretManager()
@ -1191,7 +1247,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.readLock.unlock();
}
}
private void launchAttempt(){
// Send event to launch the AM Container
eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));

View File

@ -404,7 +404,8 @@ public class TestRMRestart {
}
@Test
public void testTokenRestoredOnRMrestart() throws Exception {
public void testDelegationTokenRestoredInDelegationTokenRenewer()
throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
@ -423,7 +424,7 @@ public class TestRMRestart {
Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState();
MockRM rm1 = new MyMockRM(conf, memStore);
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
rm1.start();
HashSet<Token<RMDelegationTokenIdentifier>> tokenSet =
@ -461,21 +462,26 @@ public class TestRMRestart {
ApplicationState appState = rmAppState.get(app.getApplicationId());
Assert.assertNotNull(appState);
// assert delegation tokens exist in rm1 DelegationTokenRenewr
Assert.assertEquals(tokenSet, rm1.getRMContext()
.getDelegationTokenRenewer().getDelegationTokens());
// assert delegation tokens are saved
DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
securityTokens.rewind();
Assert.assertEquals(securityTokens, appState
.getApplicationSubmissionContext().getAMContainerSpec()
.getContainerTokens());
// start new RM
MockRM rm2 = new MyMockRM(conf, memStore);
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
rm2.start();
// verify tokens are properly populated back to DelegationTokenRenewer
Assert.assertEquals(tokenSet, rm1.getRMContext()
// verify tokens are properly populated back to rm2 DelegationTokenRenewer
Assert.assertEquals(tokenSet, rm2.getRMContext()
.getDelegationTokenRenewer().getDelegationTokens());
// stop the RM
@ -483,9 +489,92 @@ public class TestRMRestart {
rm2.stop();
}
class MyMockRM extends MockRM {
@Test
public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
public MyMockRM(Configuration conf, RMStateStore store) {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState();
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("0.0.0.0:4321", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
// submit an app
RMApp app1 =
rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), "default");
// assert app info is saved
ApplicationState appState = rmAppState.get(app1.getApplicationId());
Assert.assertNotNull(appState);
// Allocate the AM
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
// assert attempt info is saved
ApplicationAttemptState attemptState = appState.getAttempt(attemptId1);
Assert.assertNotNull(attemptState);
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
// the appToken and clientToken that are generated when RMAppAttempt is created,
HashSet<Token<?>> tokenSet = new HashSet<Token<?>>();
tokenSet.add(attempt1.getApplicationToken());
tokenSet.add(attempt1.getClientToken());
// assert application Token is saved
HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens());
Assert.assertEquals(tokenSet, savedTokens);
// start new RM
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
rm2.start();
RMApp loadedApp1 =
rm2.getRMContext().getRMApps().get(app1.getApplicationId());
RMAppAttempt loadedAttempt1 = loadedApp1.getRMAppAttempt(attemptId1);
// assert loaded attempt recovered attempt tokens
Assert.assertNotNull(loadedAttempt1);
savedTokens.clear();
savedTokens.add(loadedAttempt1.getApplicationToken());
savedTokens.add(loadedAttempt1.getClientToken());
Assert.assertEquals(tokenSet, savedTokens);
// assert clientToken is recovered back to api-versioned clientToken
Assert.assertEquals(attempt1.getClientToken(),
loadedAttempt1.getClientToken());
// Not testing ApplicationTokenSecretManager has the password populated back,
// that is needed in work-preserving restart
rm1.stop();
rm2.stop();
}
class TestSecurityMockRM extends MockRM {
public TestSecurityMockRM(Configuration conf, RMStateStore store) {
super(conf, store);
}

View File

@ -18,14 +18,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.junit.Test;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -34,6 +39,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -44,13 +51,18 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
public class TestRMStateStore {
@ -141,7 +153,7 @@ public class TestRMStateStore {
ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId(
"appattempt_1352994193343_0003_000001");
storeAttempt(testStore, attemptId,
"container_1352994193343_0003_01_000001", dispatcher);
"container_1352994193343_0003_01_000001", null, null, dispatcher);
}
@Override
@ -186,14 +198,17 @@ public class TestRMStateStore {
}
ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
String containerIdStr, TestDispatcher dispatcher)
throws Exception {
String containerIdStr, Token<ApplicationTokenIdentifier> appToken,
Token<ClientTokenIdentifier> clientToken, TestDispatcher dispatcher)
throws Exception {
Container container = new ContainerPBImpl();
container.setId(ConverterUtils.toContainerId(containerIdStr));
RMAppAttempt mockAttempt = mock(RMAppAttempt.class);
when(mockAttempt.getAppAttemptId()).thenReturn(attemptId);
when(mockAttempt.getMasterContainer()).thenReturn(container);
when(mockAttempt.getApplicationToken()).thenReturn(appToken);
when(mockAttempt.getClientToken()).thenReturn(clientToken);
dispatcher.attemptId = attemptId;
dispatcher.storedException = null;
store.storeApplicationAttempt(mockAttempt);
@ -201,30 +216,58 @@ public class TestRMStateStore {
return container.getId();
}
@SuppressWarnings("unchecked")
void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
long submitTime = System.currentTimeMillis();
Configuration conf = new YarnConfiguration();
RMStateStore store = stateStoreHelper.getRMStateStore();
TestDispatcher dispatcher = new TestDispatcher();
store.setDispatcher(dispatcher);
ApplicationTokenSecretManager appTokenMgr =
new ApplicationTokenSecretManager(conf);
ClientToAMTokenSecretManagerInRM clientTokenMgr =
new ClientToAMTokenSecretManagerInRM();
ApplicationAttemptId attemptId1 = ConverterUtils
.toApplicationAttemptId("appattempt_1352994193343_0001_000001");
ApplicationId appId1 = attemptId1.getApplicationId();
storeApp(store, appId1, submitTime);
// create application token1 for attempt1
List<Token<?>> appAttemptToken1 =
generateTokens(attemptId1, appTokenMgr, clientTokenMgr, conf);
HashSet<Token<?>> attemptTokenSet1 = new HashSet<Token<?>>();
attemptTokenSet1.addAll(appAttemptToken1);
ContainerId containerId1 = storeAttempt(store, attemptId1,
"container_1352994193343_0001_01_000001", dispatcher);
"container_1352994193343_0001_01_000001",
(Token<ApplicationTokenIdentifier>) (appAttemptToken1.get(0)),
(Token<ClientTokenIdentifier>)(appAttemptToken1.get(1)),
dispatcher);
String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002";
ApplicationAttemptId attemptId2 =
ConverterUtils.toApplicationAttemptId(appAttemptIdStr2);
ConverterUtils.toApplicationAttemptId(appAttemptIdStr2);
// create application token2 for attempt2
List<Token<?>> appAttemptToken2 =
generateTokens(attemptId2, appTokenMgr, clientTokenMgr, conf);
HashSet<Token<?>> attemptTokenSet2 = new HashSet<Token<?>>();
attemptTokenSet2.addAll(appAttemptToken2);
ContainerId containerId2 = storeAttempt(store, attemptId2,
"container_1352994193343_0001_02_000001", dispatcher);
"container_1352994193343_0001_02_000001",
(Token<ApplicationTokenIdentifier>) (appAttemptToken2.get(0)),
(Token<ClientTokenIdentifier>)(appAttemptToken2.get(1)),
dispatcher);
ApplicationAttemptId attemptIdRemoved = ConverterUtils
.toApplicationAttemptId("appattempt_1352994193343_0002_000001");
ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId();
storeApp(store, appIdRemoved, submitTime);
storeAttempt(store, attemptIdRemoved,
"container_1352994193343_0002_01_000001", dispatcher);
"container_1352994193343_0002_01_000001", null, null, dispatcher);
RMApp mockRemovedApp = mock(RMApp.class);
HashMap<ApplicationAttemptId, RMAppAttempt> attempts =
@ -268,12 +311,21 @@ public class TestRMStateStore {
assertEquals(attemptId1, attemptState.getAttemptId());
// attempt1 container is loaded correctly
assertEquals(containerId1, attemptState.getMasterContainer().getId());
// attempt1 applicationToken is loaded correctly
HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens());
assertEquals(attemptTokenSet1, savedTokens);
attemptState = appState.getAttempt(attemptId2);
// attempt2 is loaded correctly
assertNotNull(attemptState);
assertEquals(attemptId2, attemptState.getAttemptId());
// attempt2 container is loaded correctly
assertEquals(containerId2, attemptState.getMasterContainer().getId());
// attempt2 applicationToken is loaded correctly
savedTokens.clear();
savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens());
assertEquals(attemptTokenSet2, savedTokens);
// assert store is in expected state after everything is cleaned
assertTrue(stateStoreHelper.isFinalStateValid());
@ -281,4 +333,23 @@ public class TestRMStateStore {
store.close();
}
private List<Token<?>> generateTokens(ApplicationAttemptId attemptId,
ApplicationTokenSecretManager appTokenMgr,
ClientToAMTokenSecretManagerInRM clientTokenMgr, Configuration conf) {
ApplicationTokenIdentifier appTokenId =
new ApplicationTokenIdentifier(attemptId);
Token<ApplicationTokenIdentifier> appToken =
new Token<ApplicationTokenIdentifier>(appTokenId, appTokenMgr);
appToken.setService(new Text("appToken service"));
ClientTokenIdentifier clientTokenId = new ClientTokenIdentifier(attemptId);
clientTokenMgr.registerApplication(attemptId);
Token<ClientTokenIdentifier> clientToken =
new Token<ClientTokenIdentifier>(clientTokenId, clientTokenMgr);
clientToken.setService(new Text("clientToken service"));
List<Token<?>> tokenPair = new ArrayList<Token<?>>();
tokenPair.add(0, appToken);
tokenPair.add(1, clientToken);
return tokenPair;
}
}

View File

@ -537,6 +537,9 @@ public class TestContainerManagerSecurity {
"Unauthorized request to start container. "
+ "\nExpected containerId: " + tokenId.getContainerID()
+ " Found: " + newContainerId.toString()));
} catch (IOException e) {
LOG.info("Got IOException: ",e);
fail("IOException is not expected.");
}
}
@ -563,6 +566,9 @@ public class TestContainerManagerSecurity {
Assert.assertTrue(e.getMessage().contains(
"\nExpected resource " + tokenId.getResource().toString()
+ " but found " + container.getResource().toString()));
} catch (IOException e) {
LOG.info("Got IOException: ",e);
fail("IOException is not expected.");
}
}
@ -591,6 +597,9 @@ public class TestContainerManagerSecurity {
Assert.assertTrue(e.getMessage().contains(
"Expected user-name " + tokenId.getApplicationSubmitter()
+ " but found " + context.getUser()));
} catch (IOException e) {
LOG.info("Got IOException: ",e);
fail("IOException is not expected.");
}
}

View File

@ -124,7 +124,7 @@ Hadoop MapReduce Next Generation - Fair Scheduler
* The smallest container size the scheduler can allocate, in MB of memory.
* <<<yarn.scheduler.fair.minimum-allocation-mb>>>
* <<<yarn.scheduler.fair.maximum-allocation-mb>>>
* The largest container the scheduler can allocate, in MB of memory.
@ -177,7 +177,7 @@ Hadoop MapReduce Next Generation - Fair Scheduler
Allocation file format
The allocation file must be in XML format. The format contains three types of
The allocation file must be in XML format. The format contains four types of
elements:
* <<Queue elements>>, which represent queues. Each may contain the following